stream: Working With Data Streams using Connections and Web Services (2024)

Michael Hahsler

Data streams are often processed in a distributed manner using multiple machines or multiple processes. For example, a data stream may be produced by a sensor attached to a remote machine or multiple clustering algorithms run in parallel using several R processes. Another application is to connect to other software components in a stream mining pipeline.

First, we show how socket connections together with the package stream can be used to connect multiple processes or machines.

Then we give examples of how package streamConnect makes connecting stream mining components more convenient by providing an interface to connect stream processing using sockets or web services. While sockets are only used to connect data steam generating processes, web services are more versatile and can also be used to create data stream clustering processes as a service.

The last section of this paper shows how to deploy the server/web service.

For the examples below, we will use a random available port.

port <- httpuv::randomPort()port
## [1] 18335

The functions write_stream() and the class DSD_ReadStream provided in package stream can be used for communicate via connections (files, sockets, URLs, etc.). In the first example, we manually set up the connection. The example is useful to understand how sockets work especially for users interested in implementing their own components using other programming languages or connecting with other data stream software.

A more convenient way to do this using package streamConnect is described later in this paper.

Server: Serving a Data Stream

The server served data from a data stream. We use library callr to create a separate R process that serves a data stream creating 10 points every second using a socket connection, but you can also put the code in function r_bg() in a file called server.R and run (potentially on a different machine) it with R CMD BATCH server.R from the command line.

library(stream)
## Loading required package: magrittr
library(callr)rp1 <- r_bg(function(port) { library(stream) stream <- DSD_Gaussians(k = 3, d = 3) blocksize <- 10 con <- socketConnection(port = port, server = TRUE)  while (TRUE) { write_stream(stream, con, n = blocksize, close = FALSE) Sys.sleep(1) }  close(con)},  args = list(port = port))rp1
## PROCESS 'R', running, pid 107885.

Client: Reading from the Stream

The client consumes the data stream. We open the connection which starts the data generating process. retry is used to make sure the server connections are established.

## A connection with ## description "->localhost:18335"## class "sockconn" ## mode "r" ## text "text" ## opened "opened" ## can read "yes" ## can write "yes"
dsd <- retry(DSD_ReadStream(con))

We poll all available data (n = -1) several times. The first request should yield 10 points, the second none and the third request should yield 20 points (2 seconds).

get_points(dsd, n= -1)
## V1 V2 V3## 1 0.9763490 0.282770005 0.4335789## 2 0.7613053 0.032527481 0.5439431## 3 0.9517728 0.257080520 0.3972279## 4 0.3540039 0.132902499 0.8548259## 5 0.2954169 0.148716075 0.9091560## 6 0.8452791 0.021557911 0.6226094## 7 0.3407156 0.091262391 0.8557170## 8 0.2798403 0.132015442 0.9297309## 9 0.7966965 0.066396369 0.5475612## 10 0.8092318 0.054121759 0.5721670## 11 0.9293998 0.313864306 0.3937050## 12 0.9164651 0.239978452 0.3594898## 13 0.3077438 0.176170713 0.8878380## 14 0.8513389 0.082922283 0.5931865## 15 0.8980237 0.074432154 0.6549044## 16 0.8675014 0.009589531 0.6543514## 17 1.0034047 0.351236710 0.4538076## 18 0.7820864 0.054793312 0.5371526## 19 0.7586740 0.004629369 0.5345322## 20 0.9155328 0.303497505 0.4078064
get_points(dsd, n= -1)
## [1] V1 V2 V3## <0 rows> (or 0-length row.names)
Sys.sleep(2)get_points(dsd, n= -1)
## V1 V2 V3## 1 1.0231324 0.38512862 0.4920796## 2 1.0006916 0.33869209 0.4602129## 3 0.3219693 0.17329179 0.8946196## 4 0.9953849 0.33090921 0.4415728## 5 0.9007681 0.07821096 0.6488596## 6 0.9265408 0.29273446 0.3892107## 7 0.7818012 0.02566004 0.5517653## 8 0.3575518 0.12841383 0.8681639## 9 0.2710216 0.15043043 0.9259801## 10 0.2767183 0.24266620 0.9057776## 11 0.2530986 0.12877020 0.9322407## 12 0.7646118 0.01092075 0.5492191## 13 0.2918896 0.10588315 0.9150653## 14 0.9831308 0.33799476 0.4265356## 15 0.8074047 0.07398532 0.5577206## 16 0.2388177 0.18990076 0.9577896## 17 0.7442406 0.02404179 0.5204524## 18 0.2940370 0.12147545 0.9276197## 19 0.7884460 0.04131562 0.5570252## 20 0.2972245 0.16459369 0.8991799
close(con)

Server: Stoping the Server Process

Here we stop the callr process. Note that the socket connection is still active and will serve the data in the connection buffer as long as the reading process keeps the connection open.

rp1$kill()
## [1] TRUE

streamConnect provides publish DSD_via_Socket() and the class DSD_ReadSocket to create a socked based connection. DSD_via_Socket() creates a socket broadcasting data using a socket and DSD_ReadSocket creates a DSD object reading from that socket.

Server: Publish Data

We create a DSD process sending data to the port.

library(streamConnect)rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port)rp1
## PROCESS 'R', running, pid 107951.
Sys.sleep(1) # wait for the socket to become available

Client: Connect to the Data Stream

Next, we create a DSD that connects to the socket.

library(streamConnect)dsd <- retry(DSD_ReadSocket(port = port, col.names = c("x", "y", "z", ".class")))dsd
## Data Stream from Connection (d = 3, k = NA) ## Class: DSD_ReadStream, DSD_R, DSD ## connection: ->localhost:18335 (opened)
get_points(dsd, n = 10)
## x y z .class## 1 0.3722906 0.5157614 0.7597608 3## 2 0.4513325 0.9251469 0.9240499 1## 3 0.3038088 0.4979978 0.8274904 3## 4 0.5917166 0.6450462 0.9124032 2## 5 0.5159112 0.6127018 0.9994102 2## 6 0.5036040 0.9903226 0.8752367 1## 7 0.2634123 0.5456346 0.8202517 3## 8 0.6027405 0.6346192 0.9666301 2## 9 0.3958547 0.9336184 0.9565808 1## 10 0.4925691 0.9102912 0.9499309 1
plot(dsd)

stream: Working With Data Streams using Connections and Web Services (1)

close_stream(dsd)

Server: Stoping the Server Process

Closing the stream on the client also closes the connection which may already kill the serving process.

if (rp1$is_alive()) rp1$kill()

Web services are more versatile, they can be used to deploy data stream sources using publish_DSD_via_WebService()/DSD_ReadWebservice or data stream tasks using publish_DSC_via_WebService()/DSC_WebService. Here we only show how to deploy a clusterer, but a DSD can be published in a similar manner. Larger workflows can be created using DST_Runner from stream.

streamConnect uses the package plumber to manage web services. The data is transmitted in serialized form. The default serialization format it csv (comma separated values). Other formats are json and rds (see plumber::serializer_csv).

Server: Create a Web Service

Creating a clustering web service process listening for data on the port.

library(streamConnect)rp1 <- publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port)rp1
## PROCESS 'R', running, pid 108014.

Client: Connect to the Web Service

Connect to the web service with a local DSC interface.

library(streamConnect)dsc <- DSC_WebService(paste0("http://localhost", ":", port), quiet = FALSE)
## Error in curl::curl_fetch_memory(url, handle = handle): Failed to connect to localhost port 18335 after 0 ms: Connection refused## Request failed [ERROR]. Retrying in 1.2 seconds...
dsc
## Web Service Data Stream Clusterer: DBSTREAM## Served from: http://localhost:18335 ## Class: DSC_WebService, DSC_R, DSC ## Number of micro-clusters: 0 ## Number of macro-clusters: 0

Cluster some data.

dsd <- DSD_Gaussians(k = 3, d = 2, noise = 0.05)update(dsc, dsd, 500)dsc
## Web Service Data Stream Clusterer: DBSTREAM## Served from: http://localhost:18335 ## Class: DSC_WebService, DSC_R, DSC ## Number of micro-clusters: 27 ## Number of macro-clusters: 3
get_centers(dsc)
## # A tibble: 27 × 2## X1 X2## <dbl> <dbl>## 1 0.510 0.874 ## 2 0.542 0.810 ## 3 0.226 0.346 ## 4 0.493 0.0799## 5 0.253 0.301 ## 6 0.579 0.852 ## 7 0.165 0.374 ## 8 0.423 0.120 ## 9 0.548 0.116 ## 10 0.611 0.815 ## # ℹ 17 more rows
get_weights(dsc)
## [1] 41.585760 54.660767 47.184834 50.456876 66.892559 56.545707 11.792716## [8] 7.610958 15.462299 31.456354 7.122277 56.552766 46.293763 10.706963## [15] 13.645617 34.949518 62.088009 3.474951 10.227555 11.866366 39.086235## [22] 21.072941 3.532982 41.542378 6.161317 12.371493 6.821655
plot(dsc)

stream: Working With Data Streams using Connections and Web Services (2)

Server: Stop the Web Service

Kill the web service process.

rp1$kill()
## [1] TRUE

Web services and the socket-based server can be easily deployed to any server or cloud system including containers. Make sure R and the package streamConnect and all dependencies are installed. Create a short R script to start the server/service and deploy it.

library(streamConnect)port = 8001publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port,  background = FALSE)

Web services can also be deployed using a plumber task file. The following call returns the name of the task file.

publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", serve = FALSE)

Open the file in R studio to deploy it or read the plumber Hosting vignette.

stream: Working With Data Streams using Connections and Web Services (2024)
Top Articles
Latest Posts
Article information

Author: Prof. An Powlowski

Last Updated:

Views: 6470

Rating: 4.3 / 5 (44 voted)

Reviews: 83% of readers found this page helpful

Author information

Name: Prof. An Powlowski

Birthday: 1992-09-29

Address: Apt. 994 8891 Orval Hill, Brittnyburgh, AZ 41023-0398

Phone: +26417467956738

Job: District Marketing Strategist

Hobby: Embroidery, Bodybuilding, Motor sports, Amateur radio, Wood carving, Whittling, Air sports

Introduction: My name is Prof. An Powlowski, I am a charming, helpful, attractive, good, graceful, thoughtful, vast person who loves writing and wants to share my knowledge and understanding with you.