This post is about experimental software. This is not ready for public use.All code examples and API in this post are subject to change without warning.
This post describes a prototype project to handle continuous data sources oftabular data using Pandas and Streamz.
Some data never stops. It arrives continuously in a constant, never-endingstream. This happens in financial time series, web server logs, scientificinstruments, IoT telemetry, and more. Algorithms to handle this data areslightly different from what you find in libraries like NumPy and Pandas, whichassume that they know all of the data up-front. It’s still possible to useNumPy and Pandas, but you need to combine them with some cleverness and keepenough intermediate data around to compute marginal updates when new data comesin.
For example, imagine that we have a continuous stream of CSV files arrivingand we want to print out the mean of our data over time. Whenever a new CSVfile arrives we need to recompute the mean of the entire dataset. If we’reclever we keep around enough state so that we can compute this mean withoutlooking back over the rest of our historical data. We can accomplish this by keepingrunning totals and running counts as follows:
total = 0
count = 0
for filename in filenames: # filenames is an infinite iterator
df = pd.read_csv(filename)
total = total + df.sum()
count = count + df.count()
mean = total / count
Now as we add new files to our filenames iterator our code prints out newmeans that are updated over time. We don’t have a single mean result, we havecontinuous stream of mean results that are each valid for the data up to thatpoint. Our output data is an infinite stream, just like our input data.
When our computations are linear and straightforward like this a for loopsuffices. However when our computations have several streams branching out orconverging, possibly with rate limiting or buffering between them, thisfor-loop approach can grow complex and difficult to manage.
A few months ago I pushed a small library calledstreamz, which handled control flowfor pipelines, including linear map operations, operations that accumulatedstate, branching, joining, as well as back pressure, flow control, feedback,and so on. Streamz was designed to handle all of the movement of data andsignaling of computation at the right time. This library was quietly used by acouple of groups and now feels fairly clean and useful.
Streamz was designed to handle the control flow of such a system, but didnothing to help you with streaming algorithms. Over the past week I’ve beenbuilding a dataframe module on top of streamz to help with common streamingtabular data situations. This module uses Pandas and implements a subset ofthe Pandas API, so hopefully it will be easy to use for programmers withexisting Python knowledge.
Our example above could be written as follows with streamz
source = Stream.filenames('path/to/dir/*.csv') # stream of filenames
sdf = (source.map(pd.read_csv) # stream of Pandas dataframes
.to_dataframe(example=...)) # logical streaming dataframe
sdf.mean().stream.sink(print) # printed stream of mean values
This example is no more clear than the for-loop version. On its own this isprobably a worse solution than what we had before, just because it involvesnew technology. However it starts to become useful in two situations:
During development we’ve found it very useful to have live updating outputs inJupyter.
Usually when we evaluate code in Jupyter we have static inputs and staticoutputs:
However now both our inputs and our outputs are live:
We accomplish this using a combination ofipywidgets and Bokehplots both of which provide nice hooks tochange previous Jupyter outputs and work well with the Tornado IOLoop (streamz,Bokeh, Jupyter, and Dask all use Tornado for concurrency). We’re able to buildnicely responsive feedback whenever things change.
In the following example we build our CSV to dataframe pipeline that updateswhenever new files appear in a directory. Whenever we drag files to the datadirectory on the left we see that all of our outputs update on the right.
This project is very young and could use some help. There are plenty of holesin the API. That being said, the following works well:
sdf['z'] = sdf.x + sdf.y
sdf = sdf[sdf.z > 2]
Rolling reductions by number of rows or time window
Real time plotting with Bokeh (one of my favorite features)
But most importantly this needs use by people with real problems to help usunderstand what here is valuable and what is unpleasant.
Help would be welcome with any of this.
You can install this from github
pip install git+https://github.com/mrocklin/streamz.git
Documentation and code are here:
Current and upcoming work is focused on data ingestion from Kafka andparallelizing with Dask.