Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Feb 13, 2015

Towards Out-of-core ND-Arrays -- Slicing and Stacking

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

tl;dr Dask.arrays can slice and stack. This is useful for weather data.

Introduction

This is the sixth in a sequence of posts constructing an out-of-core nd-arrayusing NumPy, and dask. You can view these posts here:

  1. Simple task scheduling,
  2. Frontend usability
  3. A multi-threaded scheduler
  4. Matrix Multiply Benchmark
  5. Spilling to disk

Now we talk about slicing and stacking. We use meteorological data as anexample use case.

Slicing

Dask.array now supports most of the NumPy slicing syntax. In particular itsupports the following:

  • Slicing by integers and slices x[0, :5]
  • Slicing by a list/np.ndarray of integers x[[1, 2, 4]]
  • Slicing by a list/np.ndarray of booleans x[[False, True, True, False, True]]

It does not currently support the following:

  • Slicing one dask.array with another x[x > 0]
  • Slicing with lists in multiple axes x[[1, 2, 3], [3, 2, 1]]

Stack and Concatenate

We often store large arrays on disk in many different files. Wewant to stack or concatenate these arrays together into one logical array.Dask solves this problem with the stack and concatenate functions, whichstitch many arrays together into a single array, either creating a newdimension with stack or along an existing dimension with concatenate.

Stack

We stack many existing dask arrays into a new array, creating a new dimensionas we go.

>>> import dask.array as da
>>> arrays = [from_array(np.ones((4, 4)), blockshape=(2, 2))
... for i in range(3)] # A small stack of dask arrays

>>> da.stack(arrays, axis=0).shape
(3, 4, 4)

>>> da.stack(arrays, axis=1).shape
(4, 3, 4)

>>> da.stack(arrays, axis=2).shape
(4, 4, 3)

This creates a new dimension with length equal to the number of slices

Concatenate

We concatenate existing arrays into a new array, extending them along anexisting dimension

>>> import dask.array as da
>>> arrays = [from_array(np.ones((4, 4)), blockshape=(2, 2))
... for i in range(3)] # small stack of dask arrays

>>> da.concatenate(arrays, axis=0).shape
(12, 4)

>>> da.concatenate(arrays, axis=1).shape
(4, 12)

Case Study with Meteorological Data

To test this new functionality we download meteorologicaldata from theEuropean Centre for Medium-Range WeatherForecasts. In particular we have thetemperature for the Earth every six hours for all of 2014 with spatialresolution of a quarter degree. We download this data using thisscript (please don’thammer their servers unnecessarily) (Thanks due to StephanHoyer for pointing me to this dataset).

As a result, I now have a bunch of netCDF files!

$ ls
2014-01-01.nc3 2014-03-18.nc3 2014-06-02.nc3 2014-08-17.nc3 2014-11-01.nc3
2014-01-02.nc3 2014-03-19.nc3 2014-06-03.nc3 2014-08-18.nc3 2014-11-02.nc3
2014-01-03.nc3 2014-03-20.nc3 2014-06-04.nc3 2014-08-19.nc3 2014-11-03.nc3
2014-01-04.nc3 2014-03-21.nc3 2014-06-05.nc3 2014-08-20.nc3 2014-11-04.nc3
... ... ... ... ...

>>> import netCDF4
>>> t = netCDF4.Dataset('2014-01-01.nc3').variables['t2m']
>>> t.shape
(4, 721, 1440)

The shape corresponds to four measurements per day (24h / 6h), 720 measurementsNorth/South (180 / 0.25) and 1440 measurements East/West (360/0.25). There are365 files.

Great! We collect these under one logical dask array, concatenating alongthe time axis.

>>> from glob import glob
>>> filenames = sorted(glob('2014-*.nc3'))
>>> temps = [netCDF4.Dataset(fn).variables['t2m'] for fn in filenames]

>>> import dask.array as da
>>> arrays = [da.from_array(t, blockshape=(4, 200, 200)) for t in temps]
>>> x = da.concatenate(arrays, axis=0)

>>> x.shape
(1464, 721, 1440)

Now we can play with x as though it were a NumPy array.

avg = x.mean(axis=0)
diff = x[1000] - avg

If we want to actually compute these results we have a few options

>>> diff.compute() # compute result, return as array, float, int, whatever is appropriate
>>> np.array(diff) # compute result and turn into `np.ndarray`
>>> diff.store(anything_that_supports_setitem) # For out-of-core storage

Alternatively, because many scientific Python libraries call np.array oninputs, we can just feed our da.Array objects directly in to matplotlib(hooray for the __array__ protocol!):

>>> from matplotlib import imshow
>>> imshow(x.mean(axis=0), cmap='bone')
>>> imshow(x[1000] - x.mean(axis=0), cmap='RdBu_r')

I suspect that the temperature scale is in Kelvin. It looks like the randomday is taken during Northern Summer. Another fun one, lets look at thedifference between the temperatures at 00:00 and at 12:00

>>> imshow(x[::4].mean(axis=0) - x[2::4].mean(axis=0), cmap='RdBu_r')

Even though this looks and feels like NumPy we’re actually operating off ofdisk using blocked algorithms. We execute these operations using only a smallamount of memory. If these operations were computationally intense (theyaren’t) then we also would also benefit from multiple cores.

What just happened

To be totally clear the following steps just occurred:

  1. Open up a bunch of netCDF files and located a temperature variablewithin each file. This is cheap.
  2. For each of those temperature variables create a da.Array object,specifying how we want to block up that variable. This is also cheap.
  3. Make a new da.Array by concatenating all of our da.Arrays for eachday. This, like the other steps, is just book-keeping. We haven’t loadeddata or computed anything yet.
  4. Write numpy-style code x[::2].mean(axis=0) - x[2::2].mean(axis=0).This creates yet another da.Array with a more complex task graph. Ittakes a few hundred milliseconds to create this dictionary.
  5. Callimshow on our da.Array object
  6. imshow calls np.array on its input, this starts the multi-core taskscheduler
  7. A flurry of chunks fly out of all the netCDF files. These chunks meetvarious NumPy functions and create new chunks. Well organized magic occursand an np.ndarray emerges.
  8. Matplotlib makes a pretty graph

Problems that Popped Up

The threaded scheduler is introducing significant overhead in its planning.For this workflow the single-threaded naive scheduler is actually significantlyfaster. We’ll have to find better solutions to reduce scheduling overhead.

Conclusion

I hope that this shows off how dask.array can be useful when dealing withcollections of on-disk arrays. As always I’m very happy to hear how we canmake this project more useful for your work. If you have large n-dimensionaldatasets I’d love to hear about what you do and how dask.array can help. Ican be reached either in the comments below or at [email protected]

Acknowledgements

First, other projects can already do this. In particular if this seemed usefulfor your work then you should probably also know aboutBiggus,produced by the UK Met office, which has been around for much longer thandask.array and is used in production.

Second, this post shows off work from the following people:

  1. Erik Welch (Continuum)wrote optimization passes to clean up dask graphs before execution.
  2. Wesley Emeneker (Continuum) wrote a good deal of the slicing code
  3. Stephan Hoyer (Climate Corp)talked me through the application and pointed me to the data. If you’dlike to see dask integrated withxraythen you should definitely bug Stephan :)