Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Feb 17, 2015

Towards Out-of-core ND-Arrays -- Dask + Toolz = Bag

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

tl;dr We use dask to build a parallel Python list.

Introduction

This is the seventh in a sequence of posts constructing an out-of-core nd-arrayusing NumPy, and dask. You can view these posts here:

  1. Simple task scheduling,
  2. Frontend usability
  3. A multi-threaded scheduler
  4. Matrix Multiply Benchmark
  5. Spilling to disk
  6. Slicing and Stacking

Today we take a break from ND-Arrays and show how task scheduling can attackother collections like the simple list of Python objects.

Unstructured Data

Often before we have an ndarray or a table/DataFrame we have unstructureddata like log files. In this case tuned subsets of the language (e.g.numpy/pandas) aren’t sufficient, we need the full Python language.

My usual approach to the inconveniently large dump of log files is to usePython streamingiteratorsalong with multiprocessing or IPythonParallel on a singlelarge machine. I often write/speak about this workflow when discussingtoolz.

This workflow grows complex for most users when you introduce many processes.To resolve this we build our normal tricks into a new dask.Bag collection.

Bag

In the same way that dask.array mimics NumPy operations (e.g. matrixmultiply, slicing), dask.bag mimics functional operations like map,filter, reduce found in the standard library as well as many of thestreaming functions found in toolz.

  • Dask array = NumPy + threads
  • Dask bag = Python/Toolz + processes

Example

Here’s the obligatory wordcount example

>>> from dask.bag import Bag

>>> b = Bag.from_filenames('data/*.txt')

>>> def stem(word):
... """ Stem word to primitive form """
... return word.lower().rstrip(",.!:;'-\"").lstrip("'\"")

>>> dict(b.map(str.split).map(concat).map(stem).frequencies())
{...}

We use all of our cores and stream through memory on each core. We usemultiprocessing but could get fancier with some work.

Related Work

There are a lot of much larger much more powerful systems that have a similarAPI, notably Spark andDPark. If you have Big Data and need touse very many machines then you should stop reading this and go install them.

I mostly made dask.bag because

  1. It was very easy given the work already done on dask.array
  2. I often only need multiprocessing + a heavy machine
  3. I wanted something trivially pip installable that didn’t use the JVM

But again, if you have Big Data, then this isn’t for you.

Design

As before, a Bag is just a dict holding tasks, along with a little meta data.

>>> d = {('x', 0): (range, 5),
... ('x', 1): (range, 5),
... ('x', 2): (range, 5)}

>>> from dask.bag import Bag
>>> b = Bag(d, 'x', npartitions=3)

In this way we break up one collection

[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]

into three independent pieces

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]

When we abstractly operate on the large collection…

>>> b2 = b.map(lambda x: x * 10)

… we generate new tasks to operate on each of the components.

>>> b2.dask
{('x', 0): (range, 5),
('x', 1): (range, 5),
('x', 2): (range, 5)}
('bag-1', 0): (map, lambda x: x * 10, ('x', 0)),
('bag-1', 1): (map, lambda x: x * 10, ('x', 1)),
('bag-1', 2): (map, lambda x: x * 10, ('x', 2))}

And when we ask for concrete results (the call to list) we spin up ascheduler to execute the resulting dependency graph of tasks.

>>> list(b2)
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]

More complex operations yield more complex dasks. Beware, dask code is prettyLispy. Fortunately these dasks are internal; users don’t interact with them.

>>> iseven = lambda x: x % 2 == 0
>>> b3 = b.filter(iseven).count().dask
{'bag-3': (sum, [('bag-2', 1), ('bag-2', 2), ('bag-2', 0)]),
('bag-2', 0): (count,
(filter, iseven, (range, 5))),
('bag-2', 1): (count,
(filter, iseven, (range, 5))),
('bag-2', 2): (count,
(filter, iseven, (range, 5)))}

The current interface for Bag has the following operations:

all frequencies min
any join product
count map std
filter map_partitions sum
fold max topk
foldby mean var

Manipulations of bags create task dependency graphs. We eventually executethese graphs in parallel.

Execution

We repurpose the threaded scheduler we used for arrays to supportmultiprocessing to provide parallelism even on pure Python code. We’recareful to avoid unnecessary data transfer. None of the operations listed aboverequires significant communication. Notably we don’t have any concept ofshuffling or scatter/gather.

We use dill totake care to serialize functions properly and collect/report errors, two issuesthat plague naive use ofmultiprocessing in Python.

>>> list(b.map(lambda x: x * 10)) # This works!
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]

>>> list(b.map(lambda x: x / 0)) # This errs gracefully!
ZeroDivisionError: Execption in remote Process

integer division or modulo by zero

Traceback:
...

These tricks remove need for user expertise.

Productive Sweet Spot

I think that there is a productive sweet spot in the following configuration

  1. Pure Python functions
  2. Streaming/lazy data
  3. Multiprocessing
  4. A single large machine or a few machines in an informal cluster

This setup is common and it’s capable of handling terabyte scale workflows.In my brief experience people rarely take this route. They use single-threadedin-memory Python until it breaks, and then seek out Big Data Infrastructurelike Hadoop/Spark at relatively high productivity overhead.

Your workstation can scale bigger than you think.

Example

Here is about a gigabyte ofnetwork flow data,recording which computers made connections to which other computers on theUC-Berkeley campus in 1996.

846890339:661920 846890339:755475 846890340:197141 168.237.7.10:1163 83.153.38.208:80 2 8 4294967295 4294967295 846615753 176 2462 39 GET 21068906053917068819..html HTTP/1.0

846890340:989181 846890341:2147 846890341:2268 13.35.251.117:1269 207.83.232.163:80 10 0 842099997 4294967295 4294967295 64 1 38 GET 20271810743860818265..gif HTTP/1.0

846890341:80714 846890341:90331 846890341:90451 13.35.251.117:1270 207.83.232.163:80 10 0 842099995 4294967295 4294967295 64 1 38 GET 38127854093537985420..gif HTTP/1.0

This is actually relatively clean. Many of the fields are space delimited (notall) and I’ve already compiled and run the decade old C-code needed todecompress it from its original format.

Lets use Bag and regular expressions to parse this.

In [1]: from dask.bag import Bag, into

In [2]: b = Bag.from_filenames('UCB-home-IP*.log')

In [3]: import re

In [4]: pattern = """
...: (?P<request_time>\d+:\d+)
...: (?P<response_start>\d+:\d+)
...: (?P<response_end>\d+:\d+)
...: (?P<client_ip>\d+\.\d+\.\d+\.\d+):(?P<client_port>\d+)
...: (?P<server_ip>\d+\.\d+\.\d+\.\d+):(?P<server_port>\d+)
...: (?P<client_headers>\d+)
...: (?P<server_headers>\d+)
...: (?P<if_modified_since>\d+)
...: (?P<response_header_length>\d+)
...: (?P<response_data_length>\d+)
...: (?P<request_url_length>\d+)
...: (?P<expires>\d+)
...: (?P<last_modified>\d+)
...: (?P<method>\w+)
...: (?P<domain>\d+..)\.(?P<extension>\w*)(?P<rest_of_url>\S*)
...: (?P<protocol>.*)""".strip().replace('\n', '\s+')

In [5]: prog = re.compile(pattern)

In [6]: records = b.map(prog.match).map(lambda m: m.groupdict())

This returns instantly. We only compute results when necessary. We triggercomputation by calling list.

In [7]: list(records.take(1))
Out[7]:
[{'client_headers': '2',
'client_ip': '168.237.7.10',
'client_port': '1163',
'domain': '21068906053917068819.',
'expires': '2462',
'extension': 'html',
'if_modified_since': '4294967295',
'last_modified': '39',
'method': 'GET',
'protocol': 'HTTP/1.0',
'request_time': '846890339:661920',
'request_url_length': '176',
'response_data_length': '846615753',
'response_end': '846890340:197141',
'response_header_length': '4294967295',
'response_start': '846890339:755475',
'rest_of_url': '',
'server_headers': '8',
'server_ip': '83.153.38.208',
'server_port': '80'}]

Because bag operates lazily this small result also returns immediately.

To demonstrate depth we find the ten client/server pairs with the mostconnections.

In [8]: counts = records.pluck(['client_ip', 'server_ip']).frequencies()

In [9]: %time list(counts.topk(10, key=lambda x: x[1]))
CPU times: user 11.2 s, sys: 1.15 s, total: 12.3 s
Wall time: 50.4 s
Out[9]:
[(('247.193.34.56', '243.182.247.102'), 35353),
(('172.219.28.251', '47.61.128.1'), 22333),
(('240.97.200.0', '108.146.202.184'), 17492),
(('229.112.177.58', '47.61.128.1'), 12993),
(('146.214.34.69', '119.153.78.6'), 12554),
(('17.32.139.174', '179.135.20.36'), 10166),
(('97.166.76.88', '65.81.49.125'), 8155),
(('55.156.159.21', '157.229.248.255'), 7533),
(('55.156.159.21', '124.77.75.86'), 7506),
(('55.156.159.21', '97.5.181.76'), 7501)]

Comparison with Spark

First, it is silly and unfair to compare with PySpark running locally. PySparkoffers much more in a distributed context.

In [1]: import pyspark

In [2]: sc = pyspark.SparkContext('local')

In [3]: from glob import glob
In [4]: filenames = sorted(glob('UCB-home-*.log'))
In [5]: rdd = sc.parallelize(filenames, numSlices=4)

In [6]: import re
In [7]: pattern = ...
In [8]: prog = re.compile(pattern)

In [9]: lines = rdd.flatMap(lambda fn: list(open(fn)))
In [10]: records = lines.map(lambda line: prog.match(line).groupdict())
In [11]: ips = records.map(lambda rec: (rec['client_ip'], rec['server_ip']))

In [12]: from toolz import topk
In [13]: %time dict(topk(10, ips.countByValue().items(), key=1))
CPU times: user 1.32 s, sys: 52.2 ms, total: 1.37 s
Wall time: 1min 21s
Out[13]:
{('146.214.34.69', '119.153.78.6'): 12554,
('17.32.139.174', '179.135.20.36'): 10166,
('172.219.28.251', '47.61.128.1'): 22333,
('229.112.177.58', '47.61.128.1'): 12993,
('240.97.200.0', '108.146.202.184'): 17492,
('247.193.34.56', '243.182.247.102'): 35353,
('55.156.159.21', '124.77.75.86'): 7506,
('55.156.159.21', '157.229.248.255'): 7533,
('55.156.159.21', '97.5.181.76'): 7501,
('97.166.76.88', '65.81.49.125'): 8155}

So, given a compute-bound mostly embarrassingly parallel task (regexes arecomparatively expensive) on a single machine they are comparable.

Reasons you would want to use Spark

  • You want to use many machines and interact with HDFS
  • Shuffling operations

Reasons you would want to use dask.bag

  • Trivial installation
  • No mucking about with JVM heap sizes or config files
  • Nice error reporting. Spark error reporting includes the typical giantJava Stack trace that comes with any JVM solution.
  • Easier/simpler for Python programmers to hack on.The implementation is 350 lines including comments.

Again, this is really just a toy experiment to show that the dask model isn’tjust about arrays. I absolutely do not want to throw Dask in the ring withSpark.

Conclusion

However I do want to stress the importance of single-machine parallelism.Dask.bag targets this application well and leverages common hardware in asimple way that is both natural and accessible to most Python programmers.

A skilled developer could extend this to work in a distributed memory context.The logic to create the task dependency graphs is separate from the scheduler.

Special thanks to Erik Welch for finely craftingthe dask optimization passes that keep the data flowly smoothly.