This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project
In this post we analyze weather data across a cluster using NumPy inparallel with dask.array. We focus on the following:
This blogpost has an accompanyingscreencast which might be a bitmore fun than this text version.
This is the third in a sequence of blogposts about dask.distributed:
We wanted to emulate the typical academic cluster setup using a job schedulerlike SunGridEngine (similar to SLURM, Torque, PBS scripts and othertechnologies), a shared network file system, and typical binary stored arraysin NetCDF files (similar to HDF5).
To this end we used Starcluster, a quick way toset up such a cluster on EC2 with SGE and NFS, and we downloaded data from theEuropean Centre for Meteorology and WeatherForecasting
To deploy dask’s distributed scheduler with SGE we made a scheduler on themaster node:
[email protected]:~$ dscheduler
distributed.scheduler - INFO - Start Scheduler at: 172.31.7.88:8786
And then used the qsub command to start four dask workers, pointing to thescheduler address:
[email protected]:~$ qsub -b y -V dworker 172.31.7.88:8786
Your job 1 ("dworker") has been submitted
[email protected]:~$ qsub -b y -V dworker 172.31.7.88:8786
Your job 2 ("dworker") has been submitted
[email protected]:~$ qsub -b y -V dworker 172.31.7.88:8786
Your job 3 ("dworker") has been submitted
[email protected]:~$ qsub -b y -V dworker 172.31.7.88:8786
Your job 4 ("dworker") has been submitted
After a few seconds these workers start on various nodes in the cluster andconnect to the scheduler.
On the shared NFS drive we’ve downloaded several NetCDF3 files, each holdingthe global temperature every six hours for a single day:
>>> from glob import glob
>>> filenames = sorted(glob('*.nc3'))
>>> filenames[:5]
['2014-01-01.nc3',
'2014-01-02.nc3',
'2014-01-03.nc3',
'2014-01-04.nc3',
'2014-01-05.nc3']
We use conda to install the netCDF4 library and make a small function toread the t2m variable for “temperature at two meters elevation” from a singlefilename:
conda install netcdf4
import netCDF4
def load_temperature(fn):
with netCDF4.Dataset(fn) as f:
return f.variables['t2m'][:]
This converts a single file into a single numpy array in memory. We could callthis on an individual file locally as follows:
>>> load_temperature(filenames[0])
array([[[ 253.96238624, 253.96238624, 253.96238624, ..., 253.96238624,
253.96238624, 253.96238624],
[ 252.80590921, 252.81070124, 252.81389593, ..., 252.79792249,
252.80111718, 252.80271452],
...
>>> load_temperature(filenames[0]).shape
(4, 721, 1440)
Our dataset has dimensions of (time, latitude, longitude). Note above thateach day has four time entries (measurements every six hours).
The NFS set up by Starcluster is unfortunately quite small. We were only ableto fit around five months of data (136 days) in shared disk.
We want to call the load_temperature function on our list filenames on eachof our four workers. We connect a dask Executor to our scheduler address andthen map our function on our filenames:
>>> from distributed import Executor, progress
>>> e = Executor('172.31.7.88:8786')
>>> e
<Executor: scheduler=172.31.7.88:8786 workers=4 threads=32>
>>> futures = e.map(load_temperature, filenames)
>>> progress(futures)
After this completes we have several numpy arrays scattered about the memory ofeach of our four workers.
We coordinate these many numpy arrays into a single logical dask array asfollows:
>>> from distributed.collections import futures_to_dask_arrays
>>> xs = futures_to_dask_arrays(futures) # many small dask arrays
>>> import dask.array as da
>>> x = da.concatenate(xs, axis=0) # one large dask array, joined by time
>>> x
dask.array<concate..., shape=(544, 721, 1440), dtype=float64, chunksize=(4, 721, 1440)>
This single logical dask array is comprised of 136 numpy arrays spread acrossour cluster. Operations on the single dask array will trigger many operationson each of our numpy arrays.
We can now interact with our dataset using standard NumPy syntax and otherPyData libraries. Below we pull out a single time slice and render it to thescreen with matplotlib.
from matplotlib import pyplot as plt
plt.imshow(x[100, :, :].compute(), cmap='viridis')
plt.colorbar()
In the screencast version of thispost we hook this up to anIPython slider widget and scroll around time, which is fun.
We benchmark a few representative operations to look at the strengths andweaknesses of the distributed system.
This single element computation accesses a single number from a single NumPyarray of our dataset. It is bound by a network roundtrip from client toscheduler, to worker, and back.
>>> %time x[0, 0, 0].compute()
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 9.72 ms
This time slice computation pulls around 8 MB from a single NumPy array on asingle worker. It is likely bound by network bandwidth.
>>> %time x[0].compute()
CPU times: user 24 ms, sys: 24 ms, total: 48 ms
Wall time: 274 ms
This mean computation touches every number in every NumPy array across all ofour workers. Computing means is quite fast, so this is likely bound byscheduler overhead.
>>> %time x.mean().compute()
CPU times: user 88 ms, sys: 0 ns, total: 88 ms
Wall time: 422 ms
To make these times feel more visceral we hook up these computations to IPythonWidgets.
This first example looks fairly fluid. This only touches a single worker andreturns a small result. It is cheap because it indexes in a way that is wellaligned with how our NumPy arrays are split up by time.
@interact(time=[0, x.shape[0] - 1])
def f(time):
return x[time, :, :].mean().compute()
This second example is less fluid because we index across our NumPy chunks.Each computation touches all of our data. It’s still not bad though and quiteacceptable by today’s standards of interactive distributed datascience.
@interact(lat=[0, x.shape[1] - 1])
def f(lat):
return x[:, lat, :].mean().compute()
Until now we’ve only performed simple calculations on our data, usually grabbingout means. The image of the temperature above looks unsurprising. The imageis dominated by the facts that land is warmer than oceans and that the equatoris warmer than the poles. No surprises there.
To make things more interesting we subtract off the mean and divide by thestandard deviation over time. This will tell us how unexpectedly hot or cold aparticular point was, relative to all measurements of that point over time.This gives us something like a geo-located Z-Score.
z = (x - x.mean(axis=0)) / x.std(axis=0)
z = e.persist(z)
progress(z)
plt.imshow(z[slice].compute(), cmap='RdBu_r')
plt.colorbar()
We can now see much more fine structure of the currents of the day. In thescreencast version we hook thisdataset up to a slider as well and inspect various times.
I’ve avoided displaying GIFs of full images changing in this post to keep thesize down, however we can easily render a plot of average temperature bylatitude changing over time here:
import numpy as np
xrange = 90 - np.arange(z.shape[1]) / 4
@interact(time=[0, z.shape[0] - 1])
def f(time):
plt.figure(figsize=(10, 4))
plt.plot(xrange, z[time].mean(axis=1).compute())
plt.ylabel("Normalized Temperature")
plt.xlabel("Latitude (degrees)")
We showed how to use distributed dask.arrays on a typical academic cluster.I’ve had several conversations with different groups about this topic; it seemsto be a common case. I hope that the instructions at the beginning of thispost prove to be helpful to others.
It is really satisfying to me to couple interactive widgets with data on acluster in an intuitive way. This sort of fluid interaction on larger datasetsis a core problem in modern data science.
As always I’ll include a section like this on what didn’t work well or what Iwould have done with more time: