This work is supported by Continuum Analytics,the XDATA Program,and the Data Driven Discovery Initiative from the MooreFoundation.
We build distributed optimization algorithms with Dask. We show both simpleexamples and also benchmarks from a nascentdask-glm library for generalized linearmodels. We also talk about the experience of learning Dask to do this kindof work.
This blogpost is co-authored by Chris White(Capital One) who knows optimization and MatthewRocklin (Continuum Analytics) who knowsdistributed computing.
Many machine learning and statistics models (such as logistic regression) dependon convex optimization algorithms like Newton’s method, stochastic gradientdescent, and others. These optimization algorithms are both pragmatic (they’reused in many applications) and mathematically interesting. As a result thesealgorithms have been the subject of study by researchers and graduate studentsaround the world for years both in academia and in industry.
Things got interesting about five or ten years ago when datasets grew beyondthe size of working memory and “Big Data” became a buzzword. Parallel anddistributed solutions for these algorithms have become the norm, and aresearcher’s skillset now has to extend beyond linear algebra and optimizationtheory to include parallel algorithms and possibly even network programming,especially if you want to explore and create more interesting algorithms.
However, relatively few people understand both mathematical optimization theoryand the details of distributed systems. Typically algorithmic researchersdepend on the APIs of distributed computing libraries like Spark or Flink toimplement their algorithms. In this blogpost we explore the extent to whichDask can be helpful in these applications. We approach this from twoperspectives:
Given knowledge of algorithms and of NumPy array computing it is easy to write parallel algorithms with Dask. For a range of complicated algorithmic structures we have two straightforward choices:
Coding up either of these options from scratch can be a daunting task, but with Dask it can be as simple as writing NumPy code.
Let’s build up an example of fitting a large linear regression model using both built-in array parallelism and fancier, more customized parallelization features that Dask offers. The dask.array module helps us to easily parallelize standard NumPy functionality using the same syntax – we’ll start there.
Dask has many ways to create dask arrays; to get us started quickly prototyping let’s create some random data in a way that should look familiar to NumPy users.
import dask
import dask.array as da
import numpy as np
from dask.distributed import Client
client = Client()
## create inputs with a bunch of independent normals
beta = np.random.random(100) # random beta coefficients, no intercept
X = da.random.normal(0, 1, size=(1000000, 100), chunks=(100000, 100))
y = X.dot(beta) + da.random.normal(0, 1, size=1000000, chunks=(100000,))
## make sure all chunks are ~equally sized
X, y = dask.persist(X, y)
client.rebalance([X, y])
Observe that X is a dask array stored in 10 chunks, each of size (100000, 100). Also note that X.dot(beta) runs smoothly for both numpy and dask arrays, so we can write code that basically works in either world.
Caveat: If X is a numpy array and beta is a dask array, X.dot(beta) will output an in-memory numpy array. This is usually not desirable as you want to carefully choose when to load something into memory. One fix is to use multipledispatch to handle odd edge cases; for a starting example, check out the dot code here.
Dask also has convenient visualization features built in that we will leverage; below we visualize our data in its 10 independent chunks:
If you can write iterative array-based algorithms in NumPy, then you can write iterative parallel algorithms in Dask
As we’ve already seen, Dask inherits much of the NumPy API that we are familiar with, so we can write simple NumPy-style iterative optimization algorithms that will leverage the parallelism dask.array has built-in already. For example, if we want to naively fit a linear regression model on the data above, we are trying to solve the following convex optimization problem:
\[\min_{\beta} \|y - X\beta\|_2^2\]
Recall that in non-degenerate situations this problem has a closed-form solution that is given by:
\[\beta^* = \left(X^T X\right)^{-1} X^T y\]
We can compute $\beta^*$ using the above formula with Dask:
## naive solution
beta_star = da.linalg.solve(X.T.dot(X), X.T.dot(y))
>>> abs(beta_star.compute() - beta).max()
0.0024817567237768179
Sometimes a direct solve is too costly, and we want to solve the above problem using only simple matrix-vector multiplications. To this end, let’s take this one step further and actually implement a gradient descent algorithm which exploits parallel matrix operations. Recall that gradient descent iteratively refines an initial estimate of beta via the update:
\[\beta^+ = \beta - \alpha \nabla f(\beta)\]
where $\alpha$ can be chosen based on a number of different “step-size” rules; for the purposes of exposition, we will stick with a constant step-size:
## quick step-size calculation to guarantee convergence
_, s, _ = np.linalg.svd(2 * X.T.dot(X))
step_size = 1 / s - 1e-8
## define some parameters
max_steps = 100
tol = 1e-8
beta_hat = np.zeros(100) # initial guess
for k in range(max_steps):
Xbeta = X.dot(beta_hat)
func = ((y - Xbeta)**2).sum()
gradient = 2 * X.T.dot(Xbeta - y)
## Update
obeta = beta_hat
beta_hat = beta_hat - step_size * gradient
new_func = ((y - X.dot(beta_hat))**2).sum()
beta_hat, func, new_func = dask.compute(beta_hat, func, new_func) # <--- Dask code
## Check for convergence
change = np.absolute(beta_hat - obeta).max()
if change < tol:
break
>>> abs(beta_hat - beta).max()
0.0024817567259038942
It’s worth noting that almost all of this code is exactly the same as the equivalent NumPy code. Because Dask.array and NumPy share the same API it’s pretty easy for people who are already comfortable with NumPy to get started with distributed algorithms right away. The only thing we had to change was how we produce our original data (da.random.normal instead of np.random.normal) and the call to dask.compute at the end of the update state. The dask.compute call tells Dask to go ahead and actually evaluate everything we’ve told it to do so far (Dask is lazy by default). Otherwise, all of the mathematical operations, matrix multiplies, slicing, and so on are exactly the same as with Numpy, except that Dask.array builds up a chunk-wise parallel computation for us and Dask.distributed can execute that computation in parallel.
To better appreciate all the scheduling that is happening in one update step of the above algorithm, here is a visualization of the computation necessary to compute beta_hat and the new function value new_func:
Each rectangle is an in-memory chunk of our distributed array and every circleis a numpy function call on those in-memory chunks. The Dask schedulerdetermines where and when to run all of these computations on our cluster ofmachines (or just on the cores of our laptop).
Now that we’ve seen how to use the built-in parallel algorithms offered by Dask.array, let’s go one step further and talk about writing more customized parallel algorithms. Many distributed “consensus” based algorithms in machine learning are based on the idea that each chunk of data can be processed independently in parallel, and send their guess for the optimal parameter value to some master node. The master then computes a consensus estimate for the optimal parameters and reports that back to all of the workers. Each worker then processes their chunk of data given this new information, and the process continues until convergence.
From a parallel computing perspective this is a pretty simple map-reduce procedure. Any distributed computing framework should be able to handle this easily. We’ll use this as a very simple example for how to use Dask’s more customizable parallel options.
One such algorithm is the Alternating Direction Method of Multipliers, or ADMM for short. For the sake of this post, we will consider the work done by each worker to be a black box.
We will also be considering a regularized version of the problem above, namely:
\[\min_{\beta} \|y - X\beta\|_2^2 + \lambda \|\beta\|_1\]
At the end of the day, all we will do is:
First we need to define some local functions that the chunks will use to update their individual parameter estimates, and import the black box local_update step from dask_glm; also, we will need the so-called shrinkage operator (which is the proximal operator for the $l1$-norm in our problem):
from dask_glm.algorithms import local_update
def local_f(beta, X, y, z, u, rho):
return ((y - X.dot(beta)) **2).sum() + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
def local_grad(beta, X, y, z, u, rho):
return 2 * X.T.dot(X.dot(beta) - y) + rho * (beta - z + u)
def shrinkage(beta, t):
return np.maximum(0, beta - t) - np.maximum(0, -beta - t)
## set some algorithm parameters
max_steps = 10
lamduh = 7.2
rho = 1.0
(n, p) = X.shape
nchunks = X.npartitions
XD = X.to_delayed().flatten().tolist() # A list of pointers to remote numpy arrays
yD = y.to_delayed().flatten().tolist() # ... one for each chunk
# the initial consensus estimate
z = np.zeros(p)
# an array of the individual "dual variables" and parameter estimates,
# one for each chunk of data
u = np.array([np.zeros(p) for i in range(nchunks)])
betas = np.array([np.zeros(p) for i in range(nchunks)])
for k in range(max_steps):
# process each chunk in parallel, using the black-box 'local_update' magic
new_betas = [dask.delayed(local_update)(xx, yy, bb, z, uu, rho,
f=local_f,
fprime=local_grad)
for xx, yy, bb, uu in zip(XD, yD, betas, u)]
new_betas = np.array(dask.compute(*new_betas))
# everything else is NumPy code occurring at "master"
beta_hat = 0.9 * new_betas + 0.1 * z
# create consensus estimate
zold = z.copy()
ztilde = np.mean(beta_hat + np.array(u), axis=0)
z = shrinkage(ztilde, lamduh / (rho * nchunks))
# update dual variables
u += beta_hat - z
>>> # Number of coefficients zeroed out due to L1 regularization
>>> print((z == 0).sum())
12
There is of course a little bit more work occurring in the above algorithm, but it should be clear that the distributed operations are not one of the difficult pieces. Using dask.delayed we were able to express a simple map-reduce algorithm like ADMM with similarly simple Python for loops and delayed function calls. Dask.delayed is keeping track of all of the function calls we wanted to make and what other function calls they depend on. For example all of the local_update calls can happen independent of each other, but the consensus computation blocks on all of them.
We hope that both parallel algorithms shown above (gradient descent, ADMM) werestraightforward to someone reading with an optimization background. Theseimplementations run well on a laptop, a single multi-core workstation, or athousand-node cluster if necessary. We’ve been building somewhat moresophisticated implementations of these algorithms (and others) indask-glm. They are more sophisticated from anoptimization perspective (stopping criteria, step size, asynchronicity, and so on)but remain as simple from a distributed computing perspective.
We compare dask-glm implementations against Scikit-learn on a laptop, and thenshow them running on a cluster.
Reproducible notebook is available here
We’re building more sophisticated versions of the algorithms above indask-glm. This project has convexoptimization algorithms for gradient descent, proximal gradient descent,Newton’s method, and ADMM. These implementations extend the implementationsabove by also thinking about stopping criteria, step sizes, and other nicetiesthat we avoided above for simplicity.
In this section we show off these algorithms by performing a simple numericalexperiment that compares the numerical performance of proximal gradient descentand ADMM alongside Scikit-Learn’s LogisticRegression and SGD implementationson a single machine (a personal laptop) and then follows up by scaling thedask-glm options to a moderate cluster.
Disclaimer: These experiments are crude. We’re using artificial data, we’re not tuningparameters or even finding parameters at which these algorithms are producingresults of the same accuracy. The goal of this section is just to give ageneral feeling of how things compare.
We create data
## size of problem (no. observations)
N = 8e6
chunks = 1e6
seed = 20009
beta = (np.random.random(15) - 0.5) * 3
X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)
X, y = dask.persist(X, y)
client.rebalance([X, y])
And run each of our algorithms as follows:
# Dask-GLM Proximal Gradient
result = proximal_grad(X, y, lamduh=alpha)
# Dask-GLM ADMM
X2 = X.rechunk((1e5, None)).persist() # ADMM prefers smaller chunks
y2 = y.rechunk(1e5).persist()
result = admm(X2, y2, lamduh=alpha)
# Scikit-Learn LogisticRegression
nX, ny = dask.compute(X, y) # sklearn wants numpy arrays
result = LogisticRegression(penalty='l1', C=1).fit(nX, ny).coef_
# Scikit-Learn Stochastic Gradient Descent
result = SGDClassifier(loss='log',
penalty='l1',
l1_ratio=1,
n_iter=10,
fit_intercept=False).fit(nX, ny).coef_
We then compare with the $L_{\infty}$ norm (largest different value).
abs(result - beta).max()
Times and $L_\infty$ distance from the true “generative beta” for these parameters are shown in the table below:
Algorithm Error Duration (s) Proximal Gradient 0.0227 128 ADMM 0.0125 34.7 LogisticRegression 0.0132 79 SGDClassifier 0.0456 29.4
Again, please don’t take these numbers too seriously: these algorithms all solveregularized problems, so we don’t expect the results to necessarily be close to theunderlying generative beta (even asymptotically). The numbers above are meant todemonstrate that they all return results which were roughly the same distancefrom the beta above. Also, Dask-glm is using a full four-core laptop whileSKLearn is restricted to use a single core.
In the sections below we include profile plots for proximal gradient and ADMM.These show the operations that each of eight threads was doing over time. Youcan mouse-over rectangles/tasks and zoom in using the zoom tools in the upperright. You can see the difference in complexity of the algorithms. ADMM ismuch simpler from Dask’s perspective but also saturates hardware better forthis chunksize.
The general takeaway here is that dask-glm performs comparably to Scikit-Learnon a single machine. If your problem fits in memory on a single machine youshould continue to use Scikit-Learn and Statsmodels. The real benefit to thedask-glm algorithms is that they scale and can run efficiently on data that islarger-than-memory by operating from disk on a single computer or on acluster of computers working together.
As a demonstration, we run a larger version of the data above on a cluster ofeight m4.2xlarges on EC2 (8 cores and 30GB of RAM each.)
We create a larger dataset with 800,000,000 rows and 15 columns across eightprocesses.
N = 8e8
chunks = 1e7
seed = 20009
beta = (np.random.random(15) - 0.5) * 3
X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)
X, y = dask.persist(X, y)
We then run the same proximal_grad and admm operations from before:
# Dask-GLM Proximal Gradient
result = proximal_grad(X, y, lamduh=alpha)
# Dask-GLM ADMM
X2 = X.rechunk((1e6, None)).persist() # ADMM prefers smaller chunks
y2 = y.rechunk(1e6).persist()
result = admm(X2, y2, lamduh=alpha)
Proximal grad completes in around seventeen minutes while ADMM completes inaround four minutes. Profiles for the two computations are included below:
We include only the first few iterations here. Otherwise this plot is severalmegabytes.
These both obtained similar $L_{\infty}$ errors to what we observed before.
Algorithm Error Duration (s) Proximal Gradient 0.0306 1020 ADMM 0.00159 270
Although this time we had to be careful about a couple of things:
The algorithms in Dask-GLM are new and need development, but are in a usablestate by people comfortable operating at this technical level. Additionally,we would like to attract other mathematical and algorithmic developers to thiswork. We’ve found that Dask provides a nice balance between being flexibleenough to support interesting algorithms, while being managed enough to beusable by researchers without a strong background in distributed systems. Inthis section we’re going to discuss the things that we learned from bothChris’ (mathematical algorithms) and Matt’s (distributed systems) perspectiveand then talk about possible future work. We encourage people to pay attentionto future work; we’re open to collaboration and think that this is a goodopportunity for new researchers to meaningfully engage.
This work triggered a number of concrete changes within the Dask library:
There are a number of things we would like to do, both in terms of measurementand for the dask-glm project itself. We welcome people to voice their opinions(and join development) on the following issues:
What is your perspective here, gentle reader? Both Matt and Chris can use helpon this project. We hope that some of the issues above provide seeds forcommunity engagement. We welcome other questions, comments, and contributionseither as github issues or comments below.
Thanks also go to Hussain Sultan (CapitalOne) and Tom Augspurger for collaborationon Dask-GLM and to Will Warner (Continuum)for reviewing and editing this post.