Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Oct 16, 2017

Streaming Dataframes

By

This work is supported by Anaconda Inc and the DataDriven Discovery Initiative from the Moore Foundation

This post is about experimental software. This is not ready for public use.All code examples and API in this post are subject to change without warning.

Summary

This post describes a prototype project to handle continuous data sources oftabular data using Pandas and Streamz.

Introduction

Some data never stops. It arrives continuously in a constant, never-endingstream. This happens in financial time series, web server logs, scientificinstruments, IoT telemetry, and more. Algorithms to handle this data areslightly different from what you find in libraries like NumPy and Pandas, whichassume that they know all of the data up-front. It’s still possible to useNumPy and Pandas, but you need to combine them with some cleverness and keepenough intermediate data around to compute marginal updates when new data comesin.

Example: Streaming Mean

For example, imagine that we have a continuous stream of CSV files arrivingand we want to print out the mean of our data over time. Whenever a new CSVfile arrives we need to recompute the mean of the entire dataset. If we’reclever we keep around enough state so that we can compute this mean withoutlooking back over the rest of our historical data. We can accomplish this by keepingrunning totals and running counts as follows:

total = 0
count = 0

for filename in filenames: # filenames is an infinite iterator
df = pd.read_csv(filename)
total = total + df.sum()
count = count + df.count()
mean = total / count
print(mean)

Now as we add new files to our filenames iterator our code prints out newmeans that are updated over time. We don’t have a single mean result, we havecontinuous stream of mean results that are each valid for the data up to thatpoint. Our output data is an infinite stream, just like our input data.

When our computations are linear and straightforward like this a for loopsuffices. However when our computations have several streams branching out orconverging, possibly with rate limiting or buffering between them, thisfor-loop approach can grow complex and difficult to manage.

Streamz

A few months ago I pushed a small library calledstreamz, which handled control flowfor pipelines, including linear map operations, operations that accumulatedstate, branching, joining, as well as back pressure, flow control, feedback,and so on. Streamz was designed to handle all of the movement of data andsignaling of computation at the right time. This library was quietly used by acouple of groups and now feels fairly clean and useful.

Streamz was designed to handle the control flow of such a system, but didnothing to help you with streaming algorithms. Over the past week I’ve beenbuilding a dataframe module on top of streamz to help with common streamingtabular data situations. This module uses Pandas and implements a subset ofthe Pandas API, so hopefully it will be easy to use for programmers withexisting Python knowledge.

Example: Streaming Mean

Our example above could be written as follows with streamz

source = Stream.filenames('path/to/dir/*.csv') # stream of filenames
sdf = (source.map(pd.read_csv) # stream of Pandas dataframes
.to_dataframe(example=...)) # logical streaming dataframe

sdf.mean().stream.sink(print) # printed stream of mean values

This example is no more clear than the for-loop version. On its own this isprobably a worse solution than what we had before, just because it involvesnew technology. However it starts to become useful in two situations:

  1. You want to do more complex streaming algorithms
  2. sdf = sdf[sdf.name == 'Alice']
    sdf.x.groupby(sdf.y).mean().sink(print)

    # or

    sdf.x.rolling('300ms').mean()
  3. It would require more cleverness to build these algorithms with a for loopas above.
  4. You want to do multiple operations, deal with flow control, etc..
  5. sdf.mean().sink(print)
    sdf.x.sum().rate_limit(0.500).sink(write_to_database)
    ...
  6. Consistently branching off computations, routing data correctly, andhandling time can all be challenging to accomplish consistently.

Jupyter Integration and Streaming Outputs

During development we’ve found it very useful to have live updating outputs inJupyter.

Usually when we evaluate code in Jupyter we have static inputs and staticoutputs:

However now both our inputs and our outputs are live:

We accomplish this using a combination ofipywidgets and Bokehplots both of which provide nice hooks tochange previous Jupyter outputs and work well with the Tornado IOLoop (streamz,Bokeh, Jupyter, and Dask all use Tornado for concurrency). We’re able to buildnicely responsive feedback whenever things change.

In the following example we build our CSV to dataframe pipeline that updateswhenever new files appear in a directory. Whenever we drag files to the datadirectory on the left we see that all of our outputs update on the right.

What is supported?

This project is very young and could use some help. There are plenty of holesin the API. That being said, the following works well:

Elementwise operations:

sdf['z'] = sdf.x + sdf.y
sdf = sdf[sdf.z > 2]

Simple reductions:

sdf.sum()
sdf.x.mean()

Groupby reductions:

sdf.groupby(sdf.x).y.mean()

Rolling reductions by number of rows or time window

sdf.rolling(20).x.mean()
sdf.rolling('100ms').x.quantile(0.9)

Real time plotting with Bokeh (one of my favorite features)

sdf.plot()

What’s missing?

  1. Parallel computing: The core streamz library has an optionalDask backend for parallel computing. I haven’tyet made any attempt to attach this to the dataframe implementation.
  2. Data ingestion from common streaming sources like Kafka. We’re in theprocess now of building asynchronous-aware wrappers around Kafka Pythonclient libraries, so this is likely to come soon.
  3. Out-of-order data access: soon after parallel data ingestion (likereading from multiple Kafka partitions at once) we’ll need to figure outhow to handle out-of-order data access. This is doable, but will take someeffort. This is where more mature libraries likeFlink are quite strong.
  4. Performance: Some of the operations above (particularly rollingoperations) do involve non-trivial copying, especially with larger windows.We’re relying heavily on the Pandas library which wasn’t designed withrapidly changing data in mind. Hopefully future iterations of Pandas(Arrow/libpandas/Pandas 2.0?) will make this more efficient.
  5. Filled out API: Many common operations (like variance) haven’t yetbeen implemented. Some of this is due to laziness and some is due towanting to find the right algorithm.
  6. Robust plotting: Currently this works well for numeric data with atimeseries index but not so well for other data.

But most importantly this needs use by people with real problems to help usunderstand what here is valuable and what is unpleasant.

Help would be welcome with any of this.

You can install this from github

pip install git+https://github.com/mrocklin/streamz.git

Documentation and code are here:

Current work

Current and upcoming work is focused on data ingestion from Kafka andparallelizing with Dask.