## Submit New Event

Oops! Something went wrong while submitting the form.

## Submit News Feature

Oops! Something went wrong while submitting the form.

## Contribute a Blog

Oops! Something went wrong while submitting the form.

Oops! Something went wrong while submitting the form.
Mar 22, 2017

## Developing Convex Optimization Algorithms in Dask

By

This work is supported by Continuum Analytics,the XDATA Program,and the Data Driven Discovery Initiative from the MooreFoundation.

## Summary

We build distributed optimization algorithms with Dask. We show both simpleexamples and also benchmarks from a nascentdask-glm library for generalized linearmodels. We also talk about the experience of learning Dask to do this kindof work.

This blogpost is co-authored by Chris White(Capital One) who knows optimization and MatthewRocklin (Continuum Analytics) who knowsdistributed computing.

## Introduction

Many machine learning and statistics models (such as logistic regression) dependon convex optimization algorithms like Newton’s method, stochastic gradientdescent, and others. These optimization algorithms are both pragmatic (they’reused in many applications) and mathematically interesting. As a result thesealgorithms have been the subject of study by researchers and graduate studentsaround the world for years both in academia and in industry.

Things got interesting about five or ten years ago when datasets grew beyondthe size of working memory and “Big Data” became a buzzword. Parallel anddistributed solutions for these algorithms have become the norm, and aresearcher’s skillset now has to extend beyond linear algebra and optimizationtheory to include parallel algorithms and possibly even network programming,especially if you want to explore and create more interesting algorithms.

However, relatively few people understand both mathematical optimization theoryand the details of distributed systems. Typically algorithmic researchersdepend on the APIs of distributed computing libraries like Spark or Flink toimplement their algorithms. In this blogpost we explore the extent to whichDask can be helpful in these applications. We approach this from twoperspectives:

1. Algorithmic researcher (Chris): someone who knows optimization anditerative algorithms like Conjugate Gradient, Dual Ascent, or GMRES butisn’t so hot on distributed computing topics like sockets, MPI, loadbalancing, and so on
2. Distributed systems developer (Matt): someone who knows how to movebytes around and keep machines busy but doesn’t know the right way to do aline search or handle a poorly conditioned matrix

Given knowledge of algorithms and of NumPy array computing it is easy to write parallel algorithms with Dask. For a range of complicated algorithmic structures we have two straightforward choices:

1. Use parallel multi-dimensional arrays to construct algorithms from common operations like matrix multiplication, SVD, and so on. This mirrors mathematical algorithms well but lacks some flexibility.
2. Create algorithms by hand that track operations on individual chunks of in-memory data and dependencies between them. This is very flexible but requires a bit more care.

Coding up either of these options from scratch can be a daunting task, but with Dask it can be as simple as writing NumPy code.

Let’s build up an example of fitting a large linear regression model using both built-in array parallelism and fancier, more customized parallelization features that Dask offers. The dask.array module helps us to easily parallelize standard NumPy functionality using the same syntax – we’ll start there.

### Data Creation

Dask has many ways to create dask arrays; to get us started quickly prototyping let’s create some random data in a way that should look familiar to NumPy users.

import numpy as np

client = Client()

## create inputs with a bunch of independent normals
beta = np.random.random(100) # random beta coefficients, no intercept
X = da.random.normal(0, 1, size=(1000000, 100), chunks=(100000, 100))
y = X.dot(beta) + da.random.normal(0, 1, size=1000000, chunks=(100000,))

## make sure all chunks are ~equally sized
client.rebalance([X, y])

Observe that X is a dask array stored in 10 chunks, each of size (100000, 100). Also note that X.dot(beta) runs smoothly for both numpy and dask arrays, so we can write code that basically works in either world.

Caveat: If X is a numpy array and beta is a dask array, X.dot(beta) will output an in-memory numpy array. This is usually not desirable as you want to carefully choose when to load something into memory. One fix is to use multipledispatch to handle odd edge cases; for a starting example, check out the dot code here.

Dask also has convenient visualization features built in that we will leverage; below we visualize our data in its 10 independent chunks:

### Array Programming

If you can write iterative array-based algorithms in NumPy, then you can write iterative parallel algorithms in Dask

As we’ve already seen, Dask inherits much of the NumPy API that we are familiar with, so we can write simple NumPy-style iterative optimization algorithms that will leverage the parallelism dask.array has built-in already. For example, if we want to naively fit a linear regression model on the data above, we are trying to solve the following convex optimization problem:

$\min_{\beta} \|y - X\beta\|_2^2$

Recall that in non-degenerate situations this problem has a closed-form solution that is given by:

$\beta^* = \left(X^T X\right)^{-1} X^T y$

We can compute $\beta^*$ using the above formula with Dask:

## naive solution
beta_star = da.linalg.solve(X.T.dot(X), X.T.dot(y))

>>> abs(beta_star.compute() - beta).max()
0.0024817567237768179

Sometimes a direct solve is too costly, and we want to solve the above problem using only simple matrix-vector multiplications. To this end, let’s take this one step further and actually implement a gradient descent algorithm which exploits parallel matrix operations. Recall that gradient descent iteratively refines an initial estimate of beta via the update:

$\beta^+ = \beta - \alpha \nabla f(\beta)$

where $\alpha$ can be chosen based on a number of different “step-size” rules; for the purposes of exposition, we will stick with a constant step-size:

## quick step-size calculation to guarantee convergence
_, s, _ = np.linalg.svd(2 * X.T.dot(X))
step_size = 1 / s - 1e-8

## define some parameters
max_steps = 100
tol = 1e-8
beta_hat = np.zeros(100) # initial guess

for k in range(max_steps):
Xbeta = X.dot(beta_hat)
func = ((y - Xbeta)**2).sum()
gradient = 2 * X.T.dot(Xbeta - y)

## Update
obeta = beta_hat
beta_hat = beta_hat - step_size * gradient
new_func = ((y - X.dot(beta_hat))**2).sum()

## Check for convergence
change = np.absolute(beta_hat - obeta).max()

if change < tol:
break

>>> abs(beta_hat - beta).max()
0.0024817567259038942

It’s worth noting that almost all of this code is exactly the same as the equivalent NumPy code. Because Dask.array and NumPy share the same API it’s pretty easy for people who are already comfortable with NumPy to get started with distributed algorithms right away. The only thing we had to change was how we produce our original data (da.random.normal instead of np.random.normal) and the call to dask.compute at the end of the update state. The dask.compute call tells Dask to go ahead and actually evaluate everything we’ve told it to do so far (Dask is lazy by default). Otherwise, all of the mathematical operations, matrix multiplies, slicing, and so on are exactly the same as with Numpy, except that Dask.array builds up a chunk-wise parallel computation for us and Dask.distributed can execute that computation in parallel.

To better appreciate all the scheduling that is happening in one update step of the above algorithm, here is a visualization of the computation necessary to compute beta_hat and the new function value new_func:

Each rectangle is an in-memory chunk of our distributed array and every circleis a numpy function call on those in-memory chunks. The Dask schedulerdetermines where and when to run all of these computations on our cluster ofmachines (or just on the cores of our laptop).

Now that we’ve seen how to use the built-in parallel algorithms offered by Dask.array, let’s go one step further and talk about writing more customized parallel algorithms. Many distributed “consensus” based algorithms in machine learning are based on the idea that each chunk of data can be processed independently in parallel, and send their guess for the optimal parameter value to some master node. The master then computes a consensus estimate for the optimal parameters and reports that back to all of the workers. Each worker then processes their chunk of data given this new information, and the process continues until convergence.

From a parallel computing perspective this is a pretty simple map-reduce procedure. Any distributed computing framework should be able to handle this easily. We’ll use this as a very simple example for how to use Dask’s more customizable parallel options.

One such algorithm is the Alternating Direction Method of Multipliers, or ADMM for short. For the sake of this post, we will consider the work done by each worker to be a black box.

We will also be considering a regularized version of the problem above, namely:

$\min_{\beta} \|y - X\beta\|_2^2 + \lambda \|\beta\|_1$

At the end of the day, all we will do is:

• create NumPy functions which define how each chunk updates its parameter estimates
• wrap those functions in dask.delayed
• call dask.compute and process the individual estimates, again using NumPy

First we need to define some local functions that the chunks will use to update their individual parameter estimates, and import the black box local_update step from dask_glm; also, we will need the so-called shrinkage operator (which is the proximal operator for the $l1$-norm in our problem):

def local_f(beta, X, y, z, u, rho):
return ((y - X.dot(beta)) **2).sum() + (rho / 2) * np.dot(beta - z + u,
beta - z + u)

def local_grad(beta, X, y, z, u, rho):
return 2 * X.T.dot(X.dot(beta) - y) + rho * (beta - z + u)

def shrinkage(beta, t):
return np.maximum(0, beta - t) - np.maximum(0, -beta - t)

## set some algorithm parameters
max_steps = 10
lamduh = 7.2
rho = 1.0

(n, p) = X.shape
nchunks = X.npartitions

XD = X.to_delayed().flatten().tolist() # A list of pointers to remote numpy arrays
yD = y.to_delayed().flatten().tolist() # ... one for each chunk

# the initial consensus estimate
z = np.zeros(p)

# an array of the individual "dual variables" and parameter estimates,
# one for each chunk of data
u = np.array([np.zeros(p) for i in range(nchunks)])
betas = np.array([np.zeros(p) for i in range(nchunks)])

for k in range(max_steps):

# process each chunk in parallel, using the black-box 'local_update' magic
new_betas = [dask.delayed(local_update)(xx, yy, bb, z, uu, rho,
f=local_f,
for xx, yy, bb, uu in zip(XD, yD, betas, u)]

# everything else is NumPy code occurring at "master"
beta_hat = 0.9 * new_betas + 0.1 * z

# create consensus estimate
zold = z.copy()
ztilde = np.mean(beta_hat + np.array(u), axis=0)
z = shrinkage(ztilde, lamduh / (rho * nchunks))

# update dual variables
u += beta_hat - z

>>> # Number of coefficients zeroed out due to L1 regularization
>>> print((z == 0).sum())
12

There is of course a little bit more work occurring in the above algorithm, but it should be clear that the distributed operations are not one of the difficult pieces. Using dask.delayed we were able to express a simple map-reduce algorithm like ADMM with similarly simple Python for loops and delayed function calls. Dask.delayed is keeping track of all of the function calls we wanted to make and what other function calls they depend on. For example all of the local_update calls can happen independent of each other, but the consensus computation blocks on all of them.

We hope that both parallel algorithms shown above (gradient descent, ADMM) werestraightforward to someone reading with an optimization background. Theseimplementations run well on a laptop, a single multi-core workstation, or athousand-node cluster if necessary. We’ve been building somewhat moresophisticated implementations of these algorithms (and others) indask-glm. They are more sophisticated from anoptimization perspective (stopping criteria, step size, asynchronicity, and so on)but remain as simple from a distributed computing perspective.

## Experiment

We compare dask-glm implementations against Scikit-learn on a laptop, and thenshow them running on a cluster.

Reproducible notebook is available here

We’re building more sophisticated versions of the algorithms above indask-glm. This project has convexoptimization algorithms for gradient descent, proximal gradient descent,Newton’s method, and ADMM. These implementations extend the implementationsabove by also thinking about stopping criteria, step sizes, and other nicetiesthat we avoided above for simplicity.

In this section we show off these algorithms by performing a simple numericalexperiment that compares the numerical performance of proximal gradient descentand ADMM alongside Scikit-Learn’s LogisticRegression and SGD implementationson a single machine (a personal laptop) and then follows up by scaling thedask-glm options to a moderate cluster.

Disclaimer: These experiments are crude. We’re using artificial data, we’re not tuningparameters or even finding parameters at which these algorithms are producingresults of the same accuracy. The goal of this section is just to give ageneral feeling of how things compare.

We create data

## size of problem (no. observations)
N = 8e6
chunks = 1e6
seed = 20009
beta = (np.random.random(15) - 0.5) * 3

X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)

client.rebalance([X, y])

And run each of our algorithms as follows:

X2 = X.rechunk((1e5, None)).persist() # ADMM prefers smaller chunks
y2 = y.rechunk(1e5).persist()

# Scikit-Learn LogisticRegression
nX, ny = dask.compute(X, y) # sklearn wants numpy arrays
result = LogisticRegression(penalty='l1', C=1).fit(nX, ny).coef_

result = SGDClassifier(loss='log',
penalty='l1',
l1_ratio=1,
n_iter=10,
fit_intercept=False).fit(nX, ny).coef_

We then compare with the $L_{\infty}$ norm (largest different value).

abs(result - beta).max()

Times and $L_\infty$ distance from the true “generative beta” for these parameters are shown in the table below:

Algorithm Error Duration (s) Proximal Gradient 0.0227 128 ADMM 0.0125 34.7 LogisticRegression 0.0132 79 SGDClassifier 0.0456 29.4

Again, please don’t take these numbers too seriously: these algorithms all solveregularized problems, so we don’t expect the results to necessarily be close to theunderlying generative beta (even asymptotically). The numbers above are meant todemonstrate that they all return results which were roughly the same distancefrom the beta above. Also, Dask-glm is using a full four-core laptop whileSKLearn is restricted to use a single core.

In the sections below we include profile plots for proximal gradient and ADMM.These show the operations that each of eight threads was doing over time. Youcan mouse-over rectangles/tasks and zoom in using the zoom tools in the upperright. You can see the difference in complexity of the algorithms. ADMM ismuch simpler from Dask’s perspective but also saturates hardware better forthis chunksize.

### Profile Plot for Proximal Gradient Descent

The general takeaway here is that dask-glm performs comparably to Scikit-Learnon a single machine. If your problem fits in memory on a single machine youshould continue to use Scikit-Learn and Statsmodels. The real benefit to thedask-glm algorithms is that they scale and can run efficiently on data that islarger-than-memory by operating from disk on a single computer or on acluster of computers working together.

### Cluster Computing

As a demonstration, we run a larger version of the data above on a cluster ofeight m4.2xlarges on EC2 (8 cores and 30GB of RAM each.)

We create a larger dataset with 800,000,000 rows and 15 columns across eightprocesses.

N = 8e8
chunks = 1e7
seed = 20009
beta = (np.random.random(15) - 0.5) * 3

X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)

X2 = X.rechunk((1e6, None)).persist() # ADMM prefers smaller chunks
y2 = y.rechunk(1e6).persist()

Proximal grad completes in around seventeen minutes while ADMM completes inaround four minutes. Profiles for the two computations are included below:

#### Profile Plot for Proximal Gradient Descent

We include only the first few iterations here. Otherwise this plot is severalmegabytes.

These both obtained similar $L_{\infty}$ errors to what we observed before.

Although this time we had to be careful about a couple of things:

1. We explicitly deleted the old data after rechunking (ADMM prefers differentchunksizes than proximal_gradient) because our full dataset, 100GB, isclose enough to our total distributed RAM (240GB) that it’s a good idea toavoid keeping replias around needlessly. Things would have run fine, butspilling excess data to disk would have negatively affected performance.
2. We set the OMP_NUM_THREADS=1 environment variable to avoidover-subscribing our CPUs. Surprisingly not doing so led both to worseperformance and to non-deterministic results. An issue that we’re stilltracking down.

### Analysis

The algorithms in Dask-GLM are new and need development, but are in a usablestate by people comfortable operating at this technical level. Additionally,we would like to attract other mathematical and algorithmic developers to thiswork. We’ve found that Dask provides a nice balance between being flexibleenough to support interesting algorithms, while being managed enough to beusable by researchers without a strong background in distributed systems. Inthis section we’re going to discuss the things that we learned from bothChris’ (mathematical algorithms) and Matt’s (distributed systems) perspectiveand then talk about possible future work. We encourage people to pay attentionto future work; we’re open to collaboration and think that this is a goodopportunity for new researchers to meaningfully engage.

#### Chris’s perspective

1. Creating distributed algorithms with Dask was surprisingly easy; there isstill a small learning curve around when to call things like persist,compute, rebalance, and so on, but that can’t be avoided. Using Dask foralgorithm development has been a great learning environment forunderstanding the unique challenges associated with distributed algorithms(including communication costs, among others).
2. Getting the particulars of algorithms correct is non-trivial; there is stillwork to be done in better understanding the tolerance settings vs. accuracytradeoffs that are occurring in many of these algorithms, as well asfine-tuning the convergence criteria for increased precision.
3. On the software development side, reliably testing optimization algorithmsis hard. Finding provably correct optimality conditions that should besatisfied which are also numerically stable has been a challenge for me.
4. Working on algorithms in isolation is not nearly as fun as collaborating onthem; please join the conversation and contribute!
5. Most importantly from my perspective, I’ve found there is a surprisinglylarge amount of misunderstanding in “the community” surrounding whatoptimization algorithms do in the world of predictive modeling, whatproblems they each individually solve, and whether or not they areinterchangeable for a given problem. For example, Newton’s method can’t beused to optimize an l1-regularized problem, and the coefficient estimatesfrom an l1-regularized problem are fundamentally (and numerically) differentfrom those of an l2-regularized problem (and from those of an unregularizedproblem). My own personal goal is that the API for dask-glm exposes thesesubtle distinctions more transparently and leads to more thoughtful modelingdecisions “in the wild”.

#### Matt’s perspective

This work triggered a number of concrete changes within the Dask library:

1. We can convert Dask.dataframes to Dask.arrays. This is particularlyimportant because people want to do pre-processing with dataframes but thenswitch to efficient multi-dimensional arrays for algorithms.
2. We had to unify the single-machine scheduler and distributed scheduler APIsa bit, notably adding a persist function to the single machinescheduler. This was particularly important because Chrisgenerally prototyped on his laptop but we wanted to write code that waseffective on clusters.
3. Scheduler overhead can be a problem for the iterative dask-array algorithms(gradient descent, proximal gradient descent, BFGS). This is particularlya problem because NumPy is very fast. Often our tasks take only a fewmilliseconds, which makes Dask’s overhead of 200us per task become veryrelevant (this is why you see whitespace in the profile plots above).We’ve started resolving this problem in a few ways like more aggressivetask fusion and lower overheads generally, but this will be a medium-termchallenge. In practice for dask-glm we’ve started handling this just bychoosing chunksizes well. I suspect that for the dask-glm in particularwe’ll just develop auto-chunksize heuristics that will mostly solve thisproblem. However we expect this problem to recur in other work withscientists on HPC systems who have similar situations.
4. A couple of things can be tricky for algorithmic users:
5. Placing the calls to asynchronously start computation (persist,compute). In practice Chris did a good job here and then I came throughand tweaked things afterwards. The web diagnostics ended up beingcrucial to identify issues.
6. Avoiding accidentally calling NumPy functions on dask.arrays and viceversa. We’ve improved this on the dask.array side, and they now operateintelligently when given numpy arrays. Changing this on the NumPy sideis harder until NumPy protocols change (which is planned).

#### Future work

There are a number of things we would like to do, both in terms of measurementand for the dask-glm project itself. We welcome people to voice their opinions(and join development) on the following issues:

1. Asynchronous Algorithms
2. User APIs
3. Extend GLM families
4. Write more extensive rigorous algorithm testing - for satisfying provable optimality criteria, and for robustness to various input data
5. Begin work on smart initialization routines

What is your perspective here, gentle reader? Both Matt and Chris can use helpon this project. We hope that some of the issues above provide seeds forcommunity engagement. We welcome other questions, comments, and contributionseither as github issues or comments below.

## Acknowledgements

Thanks also go to Hussain Sultan (CapitalOne) and Tom Augspurger for collaborationon Dask-GLM and to Will Warner (Continuum)for reviewing and editing this post.