Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Feb 17, 2016

Introducing Dask distributed

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

tl;dr: We analyze JSON data on a cluster using pure Python projects.

Dask, a Python library for parallel computing, now works on clusters. Duringthe past few months I and others have extended dask with a new distributedmemory scheduler. This enables dask’s existing parallel algorithms to scaleacross 10s to 100s of nodes, and extends a subset of PyData to distributedcomputing. Over the next few weeks I and others will write about this system.Please note that dask+distributed is developing quickly and so the API islikely to shift around a bit.

Today we start simple with the typical cluster computing problem, parsing JSONrecords, filtering, and counting events using dask.bag and the new distributedscheduler. We’ll dive into more advanced problems in future posts.

A video version of this blogpost is availablehere.

GitHub Archive Data on S3

GitHub releases data dumps of their public event stream as gzipped compressed,line-delimited, JSON. This data is too large to fit comfortably into memory,even on a sizable workstation. We could stream it from disk but, due to thecompression and JSON encoding this takes a while and so slogs down interactiveuse. For an interactive experience with data like this we need a distributedcluster.

Setup and Data

We provision nine m3.2xlarge nodes on EC2. These have eight cores and 30GBof RAM each. On this cluster we provision one scheduler and nine workers (seesetup docs). (Moreon launching in later posts.) We have five months of data, from 2015-01-01 to2015-05-31 on the githubarchive-data bucket in S3. This data is publiclyavaialble if you want to play with it on EC2. You can download the fulldataset at https://www.githubarchive.org/ .

The first record looks like the following:

{'actor': {'avatar_url': 'https://avatars.githubusercontent.com/u/9152315?',
'gravatar_id': '',
'id': 9152315,
'login': 'davidjhulse',
'url': 'https://api.github.com/users/davidjhulse'},
'created_at': '2015-01-01T00:00:00Z',
'id': '2489368070',
'payload': {'before': '86ffa724b4d70fce46e760f8cc080f5ec3d7d85f',
'commits': [{'author': {'email': '[email protected]',
'name': 'davidjhulse'},
'distinct': True,
'message': 'Altered BingBot.jar\n\nFixed issue with multiple account support',
'sha': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot/commits/a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81'}],
'distinct_size': 1,
'head': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
'push_id': 536740396,
'ref': 'refs/heads/master',
'size': 1},
'public': True,
'repo': {'id': 28635890,
'name': 'davidjhulse/davesbingrewardsbot',
'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot'},
'type': 'PushEvent'}

So we have a large dataset on S3 and a moderate sized play cluster on EC2,which has access to S3 data at about 100MB/s per node. We’re ready to play.

Play

We start an ipython interpreter on our local laptop and connect to thedask scheduler running on the cluster. For the purposes of timing, the clusteris on the East Coast while the local machine is in California on commercialbroadband internet.

>>> from distributed import Executor, s3
>>> e = Executor('54.173.84.107:8786')
>>> e
<Executor: scheduler=54.173.84.107:8786 workers=72 threads=72>

Our seventy-two worker processes come from nine workers with eight processeseach. We chose processes rather than threads for this task becausecomputations will be bound by the GIL. We will change this to threads in laterexamples.

We start by loading a single month of data into distributed memory.

import json
text = s3.read_text('githubarchive-data', '2015-01', compression='gzip')
records = text.map(json.loads)
records = e.persist(records)

The data lives in S3 in hourly files as gzipped encoded, line delimited JSON.The s3.read_text and text.map functions producedask.bag objects which track ouroperations in a lazily built task graph. When we ask the executor to persistthis collection we ship those tasks off to the scheduler to run on all of theworkers in parallel. The persist function gives us back another dask.bagpointing to these remotely running results. This persist function returnsimmediately, and the computation happens on the cluster in the backgroundasynchronously. We gain control of our interpreter immediately while thecluster hums along.

The cluster takes around 40 seconds to download, decompress, and parse thisdata. If you watch the video embedded above you’ll see fancy progress-bars.

We ask for a single record. This returns in around 200ms, which is fast enoughthat it feels instantaneous to a human.

>>> records.take(1)
({'actor': {'avatar_url': 'https://avatars.githubusercontent.com/u/9152315?',
'gravatar_id': '',
'id': 9152315,
'login': 'davidjhulse',
'url': 'https://api.github.com/users/davidjhulse'},
'created_at': '2015-01-01T00:00:00Z',
'id': '2489368070',
'payload': {'before': '86ffa724b4d70fce46e760f8cc080f5ec3d7d85f',
'commits': [{'author': {'email': '[email protected]',
'name': 'davidjhulse'},
'distinct': True,
'message': 'Altered BingBot.jar\n\nFixed issue with multiple account support',
'sha': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot/commits/a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81'}],
'distinct_size': 1,
'head': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
'push_id': 536740396,
'ref': 'refs/heads/master',
'size': 1},
'public': True,
'repo': {'id': 28635890,
'name': 'davidjhulse/davesbingrewardsbot',
'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot'},
'type': 'PushEvent'},)

This particular event is a 'PushEvent'. Let’s quickly see all the kinds ofevents. For fun, we’ll also time the interaction:

>>> %time records.pluck('type').frequencies().compute()
CPU times: user 112 ms, sys: 0 ns, total: 112 ms
Wall time: 2.41 s

[('ReleaseEvent', 44312),
('MemberEvent', 69757),
('IssuesEvent', 693363),
('PublicEvent', 14614),
('CreateEvent', 1651300),
('PullRequestReviewCommentEvent', 214288),
('PullRequestEvent', 680879),
('ForkEvent', 491256),
('DeleteEvent', 256987),
('PushEvent', 7028566),
('IssueCommentEvent', 1322509),
('GollumEvent', 150861),
('CommitCommentEvent', 96468),
('WatchEvent', 1321546)]

And we compute the total count of all commits for this month.

>>> %time records.count().compute()
CPU times: user 134 ms, sys: 133 µs, total: 134 ms
Wall time: 1.49 s

14036706

We see that it takes a few seconds to walk through the data (and perform allscheduling overhead.) The scheduler adds about a millisecond overhead pertask, and there are about 1000 partitions/files here (the GitHub data is splitby hour and there are 730 hours in a month) so most of the cost here isoverhead.

Investigate Jupyter

We investigate the activities of Project Jupyter. Wechose this project because it’s sizable and because we understand the playersinvolved and so can check our accuracy. This will require us to filter ourdata to a much smaller subset, then find popular repositories and members.

>>> jupyter = (records.filter(lambda d: d['repo']['name'].startswith('jupyter/'))
.repartition(10))
>>> jupyter = e.persist(jupyter)

All records, regardless of event type, have a repository which has a name like'organization/repository' in typical GitHub fashion. We filter all recordsthat start with 'jupyter/'. Additionally, because this dataset is likelymuch smaller, we push all of these records into just ten partitions. Thisdramatically reduces scheduling overhead. The persist call hands thiscomputation off to the scheduler and then gives us back our collection thatpoints to that computing result. Filtering this month for Jupyter events takesabout 7.5 seconds. Afterwards computations on this subset feel snappy.

>>> %time jupyter.count().compute()
CPU times: user 5.19 ms, sys: 97 µs, total: 5.28 ms
Wall time: 199 ms

747

>>> %time jupyter.take(1)
CPU times: user 7.01 ms, sys: 259 µs, total: 7.27 ms
Wall time: 182 ms

({'actor': {'avatar_url': 'https://avatars.githubusercontent.com/u/26679?',
'gravatar_id': '',
'id': 26679,
'login': 'marksteve',
'url': 'https://api.github.com/users/marksteve'},
'created_at': '2015-01-01T13:25:44Z',
'id': '2489612400',
'org': {'avatar_url': 'https://avatars.githubusercontent.com/u/7388996?',
'gravatar_id': '',
'id': 7388996,
'login': 'jupyter',
'url': 'https://api.github.com/orgs/jupyter'},
'payload': {'action': 'started'},
'public': True,
'repo': {'id': 5303123,
'name': 'jupyter/nbviewer',
'url': 'https://api.github.com/repos/jupyter/nbviewer'},
'type': 'WatchEvent'},)

So the first event of the year was by 'marksteve' who decided to watch the'nbviewer' repository on new year’s day.

Notice that these computations take around 200ms. I can’t get below this frommy local machine, so we’re likely bound by communicating to such a remotelocation. A 200ms latency is not great if you’re playing a video game, butit’s decent for interactive computing.

Here are all of the Jupyter repositories touched in the month of January,

>>> %time jupyter.pluck('repo').pluck('name').distinct().compute()
CPU times: user 2.84 ms, sys: 4.03 ms, total: 6.86 ms
Wall time: 204 ms

['jupyter/dockerspawner',
'jupyter/design',
'jupyter/docker-demo-images',
'jupyter/jupyterhub',
'jupyter/configurable-http-proxy',
'jupyter/nbshot',
'jupyter/sudospawner',
'jupyter/colaboratory',
'jupyter/strata-sv-2015-tutorial',
'jupyter/tmpnb-deploy',
'jupyter/nature-demo',
'jupyter/nbcache',
'jupyter/jupyter.github.io',
'jupyter/try.jupyter.org',
'jupyter/jupyter-drive',
'jupyter/tmpnb',
'jupyter/tmpnb-redirector',
'jupyter/nbgrader',
'jupyter/nbindex',
'jupyter/nbviewer',
'jupyter/oauthenticator']

And the top ten most active people on GitHub.

>>> %time (jupyter.pluck('actor')
.pluck('login')
.frequencies()
.topk(10, lambda kv: kv[1])
.compute())
CPU times: user 8.03 ms, sys: 90 µs, total: 8.12 ms
Wall time: 226 ms

[('rgbkrk', 156),
('minrk', 87),
('Carreau', 87),
('KesterTong', 74),
('jhamrick', 70),
('bollwyvl', 25),
('pkt', 18),
('ssanderson', 13),
('smashwilson', 13),
('ellisonbg', 13)]

Nothing too surprising here if you know these folks.

Full Dataset

The full five months of data is too large to fit in memory, even for thiscluster. When we represent semi-structured data like this with dynamic datastructures like lists and dictionaries there is quite a bit of memory bloat.Some careful attention to efficient semi-structured storage here could save usfrom having to switch to such a large cluster, but that will have to bethe topic of another post.

Instead, we operate efficiently on this dataset by flowing it throughmemory, persisting only the records we care about. The distributed daskscheduler descends from the single-machine dask scheduler, which was quite goodat flowing through a computation and intelligently removing intermediateresults.

From a user API perspective, we call persist only on the jupyter dataset,and not the full records dataset.

>>> full = (s3.read_text('githubarchive-data', '2015', compression='gzip')
.map(json.loads)

>>> jupyter = (full.filter(lambda d: d['repo']['name'].startswith('jupyter/'))
.repartition(10))

>>> jupyter = e.persist(jupyter)

It takes 2m36s to download, decompress, and parse the five months of publiclyavailable GitHub events for all Jupyter events on nine m3.2xlarges.

There were seven thousand such events.

>>> jupyter.count().compute()
7065

We find which repositories saw the most activity during that time:

>>> %time (jupyter.pluck('repo')
.pluck('name')
.frequencies()
.topk(20, lambda kv: kv[1])
.compute())
CPU times: user 6.98 ms, sys: 474 µs, total: 7.46 ms
Wall time: 219 ms

[('jupyter/jupyterhub', 1262),
('jupyter/nbgrader', 1235),
('jupyter/nbviewer', 846),
('jupyter/jupyter_notebook', 507),
('jupyter/jupyter-drive', 505),
('jupyter/notebook', 451),
('jupyter/docker-demo-images', 363),
('jupyter/tmpnb', 284),
('jupyter/jupyter_client', 162),
('jupyter/dockerspawner', 149),
('jupyter/colaboratory', 134),
('jupyter/jupyter_core', 127),
('jupyter/strata-sv-2015-tutorial', 108),
('jupyter/jupyter_nbconvert', 103),
('jupyter/configurable-http-proxy', 89),
('jupyter/hubpress.io', 85),
('jupyter/jupyter.github.io', 84),
('jupyter/tmpnb-deploy', 76),
('jupyter/nbconvert', 66),
('jupyter/jupyter_qtconsole', 59)]

We see that projects like jupyterhub were quite active during that timewhile, surprisingly, nbconvert saw relatively little action.

Local Data

The Jupyter data is quite small and easily fits in a single machine. Let’sbring the data to our local machine so that we can compare times:

>>> %time L = jupyter.compute()
CPU times: user 4.74 s, sys: 10.9 s, total: 15.7 s
Wall time: 30.2 s

It takes surprisingly long to download the data, but once its here, we caniterate far more quickly with basic Python.

>>> from toolz.curried import pluck, frequencies, topk, pipe
>>> %time pipe(L, pluck('repo'), pluck('name'), frequencies,
dict.items, topk(20, key=lambda kv: kv[1]), list)
CPU times: user 11.8 ms, sys: 0 ns, total: 11.8 ms
Wall time: 11.5 ms

[('jupyter/jupyterhub', 1262),
('jupyter/nbgrader', 1235),
('jupyter/nbviewer', 846),
('jupyter/jupyter_notebook', 507),
('jupyter/jupyter-drive', 505),
('jupyter/notebook', 451),
('jupyter/docker-demo-images', 363),
('jupyter/tmpnb', 284),
('jupyter/jupyter_client', 162),
('jupyter/dockerspawner', 149),
('jupyter/colaboratory', 134),
('jupyter/jupyter_core', 127),
('jupyter/strata-sv-2015-tutorial', 108),
('jupyter/jupyter_nbconvert', 103),
('jupyter/configurable-http-proxy', 89),
('jupyter/hubpress.io', 85),
('jupyter/jupyter.github.io', 84),
('jupyter/tmpnb-deploy', 76),
('jupyter/nbconvert', 66),
('jupyter/jupyter_qtconsole', 59)]

The difference here is 20x, which is a good reminder that, once you no longerhave a large problem you should probably eschew distributed systems and actlocally.

Conclusion

Downloading, decompressing, parsing, filtering, and counting JSON recordsis the new wordcount. It’s the first problem anyone sees. Fortunately it’sboth easy to solve and the common case. Woo hoo!

Here we saw that dask+distributed handle the common case decently well and witha Pure Python stack. Typically Python users rely on a JVM technology likeHadoop/Spark/Storm to distribute their computations. Here we have Pythondistributing Python; there are some usability gains to be had here like nicestack traces, a bit less serialization overhead, and attention to otherPythonic style choices.

Over the next few posts I intend to deviate from this common case. Most “BigData” technologies were designed to solve typical data munging problems foundin web companies or with simple database operations in mind. Python users careabout these things too, but they also reach out to a wide variety of fields.In dask+distributed development we care about the common case, but also supportless traditional workflows that are commonly found in the life, physical, andalgorithmic sciences.

By designing to support these more extreme cases we’ve nailed some common painpoints in current distributed systems. Today we’ve seen low latency and remotecontrol; in the future we’ll see others.

What doesn’t work

I’ll have an honest section like this at the end of each upcoming postdescribing what doesn’t work, what still feels broken, or what I would havedone differently with more time.

  • The imports for dask and distributed are still strange. They’re twoseparate codebases that play very nicely together. Unfortunately thefunctionality you need is sometimes in one or in the other and it’s notimmediately clear to the novice user where to go. For example dask.bag, thecollection we’re using for records, jupyter, etc. is in dask but thes3 module is within the distributed library. We’ll have to merge thingsat some point in the near-to-moderate future. Ditto for the API: there arecompute methods both on the dask collections (records.compute()) and onthe distributed executor (e.compute(records)) that behave slightlydifferently.
  • We lack an efficient distributed shuffle algorithm. This is very importantif you want to use operations like .groupby (which you should avoidanyway). The user API here doesn’t even cleanly warn users that this ismissing in the distributed case which is kind of a mess. (It works fine on asingle machine.) Efficient alternatives like foldby are available.
  • I would have liked to run this experiment directly on the cluster to seehow low we could have gone below the 200ms barrier we ran into here.

Links

  • dask, the original project
  • dask.distributed, thedistributed memory scheduler powering the cluster computing
  • dask.bag, the user API we’veused in this post.
  • This post largely repeats work by Blake Griffith in asimilar postlast year with an older iteration of the dask distributed scheduler