Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Mar 18, 2019

Dask and the __array_function__ protocol

By

Summary

Dask is versatile for analytics parallelism, but there is still one issue toleverage it to a broader spectrum: allowing it to transparently work withNumPy-like libraries. We have previously discussedhow to work withGPU Dask Arrays,but limited to the scope of the array’s member methods sharing a NumPy-likeinterface, for example the .sum() method, thus, calling general functionalityfrom NumPy’s library wasn’t still possible. NumPy recently addressed this issuein NEP-18with the introduction of the __array_function__ protocol. In short, theprotocol allows a NumPy function call to dispatch the appropriate NumPy-likelibrary implementation, depending on the array type given as input, thusallowing Dask to remain agnostic of such libraries, internally calling just theNumPy function, which automatically handles dispatching of the appropriatelibrary implementation, for example,CuPy or Sparse.

To understand what’s the end goal of this change, consider the followingexample:

import numpy as np
import dask.array as da

x = np.random.random((5000, 1000))

d = da.from_array(x, chunks=(1000, 1000))

u, s, v = np.linalg.svd(d)

Now consider we want to speedup the SVD computation of a Dask array and offloadthat work to a CUDA-capable GPU, we ultimately want to simply replace the NumPyarray x by a CuPy array and let NumPy do its magic via__array_function__ protocol and dispatch the appropriate CuPy linear algebraoperations under the hood:

import numpy as np
import cupy
import dask.array as da

x = cupy.random.random((5000, 1000))

d = da.from_array(x, chunks=(1000, 1000))

u, s, v = np.linalg.svd(d)

We could do the same for a Sparse array, or any other NumPy-like array thatsupports the __array_function__ protocol and the computation that we aretrying to perform. In the next section, we will take a look at potentialperformance benefits that the protocol helps leveraging.

Note that the features described in this post are still experimental, somestill under development and review. For a more detailed discussion on theactual progress of __array_function__, please refer to the Issues section.

Performance

Before going any further, assume the following hardware is utilized for allperformance results described in this entire post:

  • CPU: 6-core (12-threads) Intel Core i7-7800X @ 3.50GHz
  • Main memory: 16 GB
  • GPU: NVIDIA Quadro GV100
  • OpenBLAS 0.2.18
  • cuBLAS 9.2.174
  • cuSOLVER 9.2.148

Let’s now check an example to see potential performance benefits of the__array_function__ protocol with Dask when using CuPy as a backend. Let’sstart by creating all the arrays that we will use for computing an SVD later.Please note that my focus here is how Dask can leverage compute performance,therefore I’m ignoring in this example the time spent on creating or copyingthe arrays between CPU and GPU.

import numpy as np
import cupy
import dask.array as da

x = np.random.random((10000, 1000))
y = cupy.array(x)

dx = da.from_array(x, chunks=(5000, 1000))
dy = da.from_array(y, chunks=(5000, 1000), asarray=False)

Seen above we have four arrays:

  • x: a NumPy array in main memory;
  • y: a CuPy array in GPU memory;
  • dx: a NumPy array wrapped in a Dask array;
  • dy: a copy of a CuPy array wrapped in a Dask array; wrapping a CuPyarray in a Dask array as a view (asarray=True) is not supported yet.

Compute SVD on a NumPy array

We can then start by computing the SVD of x using NumPy, thus, it’sprocessed on CPU in a single thread:

u, s, v = np.linalg.svd(x)

The timing information I obtained after that looks like the following:

CPU times: user 3min 10s, sys: 347 ms, total: 3min 11s
Wall time: 3min 11s

Over 3 minutes seems a bit too slow, so now the question is: Can we do better,and more importantly, without having to change our entire code?

The answer to this question is: Yes, we can.

Let’s look now at other results.

Compute SVD on the NumPy array wrapped in Dask array

First, of all, this is what you had to do before the introduction of the__array_function__ protocol:

u, s, v = da.linalg.svd(dx)
u, s, v = dask.compute(u, s, v)

The code above might have been very prohibitive for several projects, since oneneeds to call the proper library dispatcher in addition to passing the correctarray. In other words, one would need to find all NumPy calls in the code andreplace those by the correct library’s function call, depending on the inputarray type. After __array_function__, the same NumPy function can becalled, using the Dask array dx as input:

u, s, v = np.linalg.svd(dx)
u, s, v = dask.compute(u, s, v)

Note: Dask defers computation of results until its consumption, therefore weneed to call the dask.compute() function on result arrays to actually computethem.

Let’s now take a look at the timing information:

CPU times: user 1min 23s, sys: 460 ms, total: 1min 23s
Wall time: 1min 13s

Now, without changing any code, besides the wrapping of the NumPy array as aDask array, we can see a speedup of 2x. Not too bad. But let’s go back to ourprevious question: Can we do better?

Compute SVD on the CuPy array

We can do the same as for the Dask array now and simply call NumPy’s SVDfunction on the CuPy array y:

u, s, v = np.linalg.svd(y)

The timing information we get now is the following:

CPU times: user 17.3 s, sys: 1.81 s, total: 19.1 s
Wall time: 19.1 s

We now see a 4-5x speedup with no change in internal calls whatsoever! This isexactly the sort of benefit that we expect to leverage with the__array_function__ protocol, speeding up existing code, for free!

Let’s go back to our original question one last time: Can we do better?

Compute SVD on the CuPy array wrapped in Dask array

We can now take advantage of the benefits of Dask data chunk splitting andthe CuPy GPU implementation, in an attempt to keep our GPU busy as much aspossible, this remains as simple as:

u, s, v = np.linalg.svd(dy)
u, s, v = dask.compute(u, s, v)

For which we get the following timing:

CPU times: user 8.97 s, sys: 653 ms, total: 9.62 s
Wall time: 9.45 s

Giving us another 2x speedup over the single-threaded CuPy SVD computing.

To conclude, we started from over 3 minutes and are now down to under 10seconds by simply dispatching the work on a different array.

Application

We will now talk a bit about potential applications of the__array_function__ protocol. For this, we will discuss theDask-GLM library, used for fittingGeneralized Linear Models on large datasets. It’s built on top of Dask andoffers an API compatible with scikit-learn.

Before the introduction of the __array_function__ protocol, we would needto rewrite most of its internal implementation for each and every NumPy-likelibrary that we wished to use as a backend, therefore, we would need aspecialization of the implementation for Dask, another for CuPy and yetanother for Sparse. Now, thanks to all the functionality that these librariesshare through compatible interface, we don’t have to change the implementationat all, we simply pass a different array type as input, as simple as that.

Example with scikit-learn

To demonstrate the ability we acquired, let’s consider the followingscikit-learn example (based on the original examplehere):

import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression

N = 1000

# x from 0 to N
x = N * np.random.random((40000, 1))

# y = a*x + b with noise
y = 0.5 * x + 1.0 + np.random.normal(size=x.shape)

# create a linear regression model
est = LinearRegression()

We can then fit the model,

est.fit(x, y)

and obtain its time measurements:

CPU times: user 3.4 ms, sys: 0 ns, total: 3.4 ms
Wall time: 2.3 ms

We can then use it for prediction on some test data,

# predict y from the data
x_new = np.linspace(0, N, 100)
y_new = est.predict(x_new[:, np.newaxis])

And also check its time measurements:

CPU times: user 1.16 ms, sys: 680 µs, total: 1.84 ms
Wall time: 1.58 ms

And finally plot the results:

# plot the results
plt.figure(figsize=(4, 3))
ax = plt.axes()
ax.scatter(x, y, linewidth=3)
ax.plot(x_new, y_new, color='black')

ax.set_facecolor((1.0, 1.0, 0.42))

ax.set_xlabel('x')
ax.set_ylabel('y')

ax.axis('tight')

plt.show()

Example with Dask-GLM

The only thing we have to change from the code before is the first block, wherewe import libraries and create arrays:

import numpy as np
from dask_glm.estimators import LinearRegression
import matplotlib.pyplot as plt

N = 1000

# x from 0 to N
x = N * np.random.random((40000, 1))

# y = a*x + b with noise
y = 0.5 * x + 1.0 + np.random.normal(size=x.shape)

# create a linear regression model
est = LinearRegression(solver='lbfgs')

The rest of the code and also the plot look alike the previous scikit-learnexample, so we’re ommitting those here for brevity. Note also that we couldhave called LinearRegression() without any arguments, but for this examplewe chose thelbfgssolver, that converges reasonably fast.

We can also have a look at the timing results for fitting, followed by thosefor predicting with Dask-GLM:

# Fitting
CPU times: user 9.66 ms, sys: 116 µs, total: 9.78 ms
Wall time: 8.94 ms

# Predicting
CPU times: user 130 µs, sys: 327 µs, total: 457 µs
Wall time: 1.06 ms

If instead we want to use CuPy to compute, we have to change only 3 lines,importing cupy instead of numpy, and the two lines where we create therandom arrays, replacing them to cupy.random insted of np.random. Theblock should then look like this:

import cupy
from dask_glm.estimators import LinearRegression
import matplotlib.pyplot as plt

N = 1000

# x from 0 to N
x = N * cupy.random.random((40000, 1))

# y = a*x + b with noise
y = 0.5 * x + 1.0 + cupy.random.normal(size=x.shape)

# create a linear regression model
est = LinearRegression(solver='lbfgs')

And the timing results we obtain in this scenario are:

# Fitting
CPU times: user 151 ms, sys: 40.7 ms, total: 191 ms
Wall time: 190 ms

# Predicting
CPU times: user 1.91 ms, sys: 778 µs, total: 2.69 ms
Wall time: 1.37 ms

For the simple example chosen for this post, scikit-learn outperforms Dask-GLMusing both NumPy and CuPy arrays. There may exist several reasons thatcontribute to this, and while we didn’t dive deep into understanding the exactreasons and their extent, we could cite some likely possibilities:

  • scikit-learn may be using solvers that converge faster;
  • Dask-GLM is entirely built on top of Dask, while scikit-learn may beheavily optimized internally;
  • Too many synchronization steps and data transfer could occur for smalldatasets with CuPy.

Performance for different Dask-GLM solvers

To verify how Dask-GLM with NumPy arrays would compare with CuPy arrays, wealso did some logistic regression benchmarking of Dask-GLM solvers. The resultsbelow were obtained from a training dataset with 102,103, …, 106 features of 100 dimensions, and matchingnumber of test features.

Note: we are intentionally omitting results for Dask arrays, as we haveidentified a potential bug thatcauses Dask arrays not to converge.

From the results observed in the graphs above we can see that CuPy can be oneorder of magnitude faster than NumPy for fitting with any of the Dask-GLMsolvers. Please note also that both axis are given in logarithmic scale foreasier visualization.

Another interesting effect that can be seen is how converging may take longerfor small number of samples. However, as we would normally hope for, computetime required to converge scales linearly to the number of samples.

Prediction with CuPy, as seen above, can be proportionally much faster thanNumPy, staying mostly constant for all solvers, and around 3-4 orders ofmagnitude faster.

Issues

In this section we describe the work that has been done and is still ongoingsince February, 2019, towards enabling the features described in previoussections. If you are not interested in all the details, feel free to completelyskip this.

Fixed Issues

Since early February, 2019, substantial progress has been made towards deepersupport of the __array_function__ protocol in the different projects,this trend is still going on and will continue in March. Below we see a listof issues that have been fixed or are in the process of review:

Known Issues

Currently, one of the biggest issues we are tackling relates to theDask issue #4490 we firstidentified when calling Dask’s diag() on a CuPy array. This requires somechange on the Dask Array class, and subsequent changes throughout largeparts of the Dask codebase. I will not go into too much detail here, but theway we are handling this issue is by adding a new attribute _meta to DaskArray in replacement of the simple dtype that currently exists. Thisnew attribute will not only hold the dtype information, but also an emptyarray of the backend type used to create the Array in the first place, thusallowing us to internally reconstruct arrays of the backend type, withouthaving to know explicitly whether it’s a NumPy, CuPy, Sparse or any otherNumPy-like array. For additional details, please refer to Dask Issue#2977.

We have identified some more issues with ongoing discussions:

Future Work

There are several possibilities for a richer experience with Dask, some of whichcould be very interesting in the short and mid-term are:

  1. Use Dask-cuDF alongside withDask-GLM to present interesting realistic applications of the wholeecosystem;
  2. More comprehensive examples and benchmarks for Dask-GLM;
  3. Support for more models inDask-GLM;
  4. A deeper look into the Dask-GLM versus scikit-learn performance;
  5. Profile CuPy’s performance of matrix-matrix multiplication operations(GEMM), compare to matrix-vector multiplication operations (GEMV) fordistributed Dask operation.