Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
May 13, 2020

Large SVDs

By

Summary

We perform Singular Value Decomposition (SVD) calculations on large datasets.

We modify the computation both by using fully precise and approximate methods,and by using both CPUs and GPUs.

In the end we compute an approximate SVD of 200GB of simulated data and using a mutli-GPU machine in 15-20 seconds.Then we run this from a dataset stored in the cloudwhere we find that I/O is, predictably, a major bottleneck.

SVD - The simple case

Dask arrays contain a relatively sophisticated SVD algorithm that works in thetall-and-skinny or short-and-fat cases, but not so well in the roughly-squarecase. It works by taking QR decompositions of each block of the array,combining the R matrices, doing another smaller SVD on those, and thenperforming some matrix multiplication to get back to the full result. It’snumerically stable and decently fast, assuming that the intermediate Rmatrices of the QR decompositions mostly fit in memory.

The memory constraints here are that if you have an n by m tall andskinny array (n >> m) cut into k blocks then you need to have about m**2 * k space. This is true in many cases, including typical PCA machine learningworkloads, where you have tabular data with a few columns (hundreds at most)and many rows.

It’s easy to use and quite robust.

import dask.array as da

x = da.random.random((10000000, 20))
x

Array Chunk Bytes 1.60 GB 100.00 MB Shape (10000000, 20) (625000, 20) Count 16 Tasks 16 Chunks Type float64 numpy.ndarray 2010000000

u, s, v = da.linalg.svd(x)

This works fine in the short and fat case too (when you have far more columnsthan rows) but we’re always going to assume that one of your dimensions isunchunked, and that the other dimension has chunks that are quite a bitlonger, otherwise, things might not fit into memory.

Approximate SVD

If your dataset is large in both dimensions then the algorithm above won’t workas is. However, if you don’t need exact results, or if you only need a few ofthe components, then there are a number of excellent approximation algorithms.

Dask array has one of these approximation algorithms implemented in theda.linalg.svd_compressedfunction. And with it we can compute the approximate SVD of very largematrices.

We were recently working on a problem (explained below) and found that we werestill running out of memory when dealing with this algorithm. There were twochallenges that we ran into:

  1. The algorithm requires multiple passes over the data, but the Dask taskscheduler was keeping the input matrix in memory after it had been loaded oncein order to avoid recomputation.Things still worked, but Dask had to move the data to disk and backrepeatedly, which reduced performance significantly.
  2. We resolved this by including explicit recomputation steps in the algorithm.
  3. Related chunks of data would be loaded at different times, and so wouldneed to stick around longer than necessary to wait for their associatedchunks.
  4. We resolved this by engaging task fusion as an optimization pass.

Before diving further into the technical solutionwe quickly provide the use case that was motivating this work.

Application - Genomics

Many studies are using genome sequencing to study genetic variationbetween different individuals within a species. These includesstudies of human populations, but also other species such as mice,mosquitoes or disease-causing parasites. These studies will, ingeneral, find a large number of sites in the genome sequence whereindividuals differ from each other. For example, humans have morethan 100 million variable sites in the genome, and modern studieslike the UK BioBank are working towardssequencing the genomes of 1 million individuals or more.

In diploid species like humans, mice or mosquitoes, each individualcarries two genome sequences, one inherited from each parent. At eachof the 100 million variable genome sites there will be two or more“alleles” that a single genome might carry. One way to think aboutthis is via the Punnettsquare, whichrepresents the different possible genotypes that one individual mightcarry at one of these variable sites:

punnet square

In the above there are three possible genotypes: AA, Aa, and aa. Forcomputational genomics, these genotypes can be encoded as 0, 1, or 2.In a study of a species with M genetic variants assayed in Nindividual samples, we can represent these genotypes as an (M x N)array of integers. For a modern human genetics study, the scale ofthis array might approach (100 million x 1 million). (Although inpractice, the size of the first dimension (number of variants) can bereduced somewhat, by at least an order of magnitude, because manygenetic variants will carry little information and/or be correlatedwith each other.)

These genetic differences are not random, but carry information aboutpatterns of genetic similarity and shared ancestry betweenindividuals, because of the way they have been inherited through manygenerations. A common task is to perform a dimensionality reductionanalysis on these data, such as a principal componentsanalysis(SVD), to identify genetic structure reflecting these differencies indegree of shared ancestry. This is an essential part of discoveringgenetic variants associated with different diseases, and for learningmore about the genetic history of populations and species.

Reducing the time taken to compute an analysis such as SVD, like allscience, allows for exploring larger datasets and testing morehypotheses in less time. Practically, this means not simply a fastSVD but an accelerated pipeline end-to-end, from data loading toanalysis, to understanding.

We want to run an experiment in less time than it takes to make a cup of tea

Performant SVDs w/ Dask

Now that we have that scientific background, let’s transition back to talking about computation.

To stop Dask from holding onto the data we intentionally trigger computation aswe build up the graph. This is a bit atypical in Dask calculations (we preferto have as much of the computation at once before computing) but useful giventhe multiple-pass nature of this problem. This was a fairly easy change, andis available in dask/dask #5041.

Additionally, we found that it was helpful to turn on moderately wide taskfusion.

import dask
dask.config.set({"optimization.fuse.ave-width": 5})

Then things work fine

We’re going to try this SVD on a few different choices of hardware including:

  1. A MacBook Pro
  2. A DGX-2, an NVIDIA worksation with 16 high-end GPUs and fast interconnect
  3. A twenty-node cluster on AWS

Macbook Pro

We can happily perform an SVD on a 20GB array on a Macbook Pro

import dask.array as da

x = da.random.random(size=(1_000_000, 20_000), chunks=(20_000, 5_000))

u, s, v = da.linalg.svd_compressed(x, k=10, compute=True)
v.compute()

This call is no longer entirely lazy, and it recomputes x a couple times, butit works, and it works using only a few GB of RAM on a consumer laptop.

It takes around 2min 30s time to compute that on a laptop.That’s great! It was super easy to try out, didn’t require any specialhardware or setup, and in many cases is fast enough.By working locally we can iterate quickly.

Now that things work, we can experiment with different hardware.

Adding GPUs (a 15 second SVD)

Disclaimer: one of the authors (Ben Zaitlen) works for NVIDIA

We can dramatically increase performance by using a multi-GPU machine.NVIDIA and other manufacturers now make machines with multiple GPUs co-located in the same physical box.In the following section, we will run the calculations on a DGX2, a machine with 16 GPUs and fast interconnect between the GPUs.

Below is almost the same code, running in significantly less same time but we make thefollowing changes:

  1. We increase the size of the array by a factor of 10x
  2. We switch out NumPy for CuPy, a GPU NumPy implementation
  3. We use a sixteen-GPU DGX-2 machine with NVLink interconnects between GPUs (NVLink will dramatically decrease transfer time between workers)

On A DGX2 we can calculate an SVD on a 200GB Dask array between 10 to 15 seconds.

The full notebook is here,but the relevant code snippets are below:

# Some GPU specific setup
from dask_cuda import LocalCluster

cluster = LocalCluster(...)
client = Client(cluster)

import cupy
import dask.array as da
rs = da.random.RandomState(RandomState=cupy.random.RandomState)

# Create the data and run the SVD as normal
x = rs.randint(0, 3, size=(10_000_000, 20_000),
chunks=(20_000, 5_000), dtype="uint8")
x = x.persist()

u, s, v = da.linalg.svd_compressed(x, k=10, seed=rs)
v.compute()

To see this run, we recommend viewing this screencast:

Read dataset from Disk

While impressive, the computation above is mostly bound by generating randomdata and then performing matrix calculations. GPUs are good at both of thesethings.

In practice though, our input array won’t be randomly generated, it will becoming from some dataset stored on disk or increasingly more common, stored in the cloud.To make things more realistic we perform a similar calculation with datastored in a Zarr formatin GCS

In this Zarr SVD example,we load a 25GB GCS backed data set onto a DGX2,run a few processing steps, then perform an SVD.The combination of preprocessing and SVD calculations ran in 18.7 sec and the data loading took 14.3 seconds.

Again, on a DGX2, from data loading to SVD we are running in time less than it would take to make a cup of tea.However, the data loading can be accelerated.From GCS we are reading into data into the main memory of the machine (host memory), uncompressing the zarr bits,then moving the data from host memory to the GPU (device memory). Passing data back and forth between host and device memory can significantly decrease performance. Reading directly into the GPU, bypassing host memory, would improve the overall pipeline.

And so we come back to a common lesson of high performance computing:

High performance computing isn’t about doing one thing exceedingly well, it’sabout doing nothing poorly.

In this case GPUs made our computation fast enough that we now need to focus onother components of our system, notably disk bandwidth, and a direct reader forZarr data to GPU memory.

Cloud

Diclaimer: one of the authors (Matthew Rocklin) works for Coiled Computing

We can also run this on the cloud with any number of frameworks.In this case we used the Coiled Cloud service to deploy on AWS

from coiled_cloud import Cloud, Cluster
cloud = Cloud()

cloud.create_cluster_type(
organization="friends",
name="genomics",
worker_cpu=4,
worker_memory="16 GiB",
worker_environment={
"OMP_NUM_THREADS": 1,
"OPENBLAS_NUM_THREADS": 1,
# "EXTRA_PIP_PACKAGES": "zarr"
},
)

cluster = Cluster(
organization="friends",
typename="genomics",
n_workers=20,
)

from dask.distributed import Client
client = Client(cluster)

# then proceed as normal

Using 20 machines with a total of 80 CPU cores on a dataset that was 10x largerthan the MacBook pro example we were able to run in about the same amount oftime. This shows near optimal scaling for this problem, which is nice to seegiven how complex the SVD calculation is.

A screencast of this problem is viewable here

Compression

One of the easiest ways for us to improve performance is to reduce the size ofthis data through compression.This data is highly compressible for two reasons:

  1. The real-world data itself has structure and repetition(although the random play data does not)
  2. We’re storing entries that take on only four values.We’re using eight-bit integers when we only needed two-bit integers

Let’s solve the second problem first.

Compression with bit twiddling

Ideally Numpy would have a two-bit integer datatype.Unfortunately it doesn’t, and this is hard because memory in computers isgenerally thought of in full bytes.

To work around this we can use bit arithmetic to shove four values into a single valueHere are functions that do that, assuming that our array is square,and the last dimension is divisible by four.

import numpy as np

def compress(x: np.ndarray) -> np.ndarray:
out = np.zeros_like(x, shape=(x.shape[0], x.shape[1] // 4))
out += x[:, 0::4]
out += x[:, 1::4] << 2
out += x[:, 2::4] << 4
out += x[:, 3::4] << 6
return out


def decompress(out: np.ndarray) -> np.ndarray:
back = np.zeros_like(out, shape=(out.shape[0], out.shape[1] * 4))
back[:, 0::4] = out & 0b00000011
back[:, 1::4] = (out & 0b00001100) >> 2
back[:, 2::4] = (out & 0b00110000) >> 4
back[:, 3::4] = (out & 0b11000000) >> 6
return back

Then, we can use these functions along with Dask to store our data in acompressed state, but lazily decompress on-demand.

x = x.map_blocks(compress).persist().map_blocks(decompress)

That’s it. We compress each block our data and store that in memory.However the output variable that we have, x will decompress each chunk beforewe operate on it, so we don’t need to worry about handling compressed blocks.

Compression with Zarr

A slightly more general but probably less efficient route would be to compressour arrays with a proper compression library like Zarr.

The example below shows how we do this in practice.

import zarr
import numpy as np
from numcodecs import Blosc
compressor = Blosc(cname='lz4', clevel=3, shuffle=Blosc.BITSHUFFLE)


x = x.map_blocks(zarr.array, compressor=compressor).persist().map_blocks(np.array)

Additionally, if we’re using the dask-distributed scheduler then we want tomake sure that the Blosc compression library doesn’t use additional threads.That way we don’t have parallel calls of a parallel library, which can causesome contention

def set_no_threads_blosc():
""" Stop blosc from using multiple threads """
import numcodecs
numcodecs.blosc.use_threads = False

# Run on all workers
client.register_worker_plugin(set_no_threads_blosc)

This approach is more general, and probably a good trick to have up ones’sleeve, but it also doesn’t work on GPUs, which in the end is why we ended upgoing with the bit-twiddling approach one section above, which uses API thatwas uniformly accessible within the Numpy and CuPy libraries.

Final Thoughts

In this post we did a few things, all around a single important problems ingenomics.

  1. We learned a bit of science
  2. We translated a science problem into a computational problem,and in particular into a request to perform large singular value decompositions
  3. We used a canned algorithm in dask.array that performed pretty well,assuming that we’re comfortable going over the array in a few passes
  4. We then tried that algorithm on three architectures quickly
  5. A Macbook Pro
  6. A multi-GPU machine
  7. A cluster in the cloud
  8. Finally we talked about some tricks to pack more data into the same memorywith compression

This problem was nice in that we got to dive deep into a technical scienceproblem, and yet also try a bunch of architecture quickly to investigatehardware choices that we might make in the future.

We used several technologies here today, made by several different communitiesand companies. It was great to see how they all worked together seamlessly toprovide a flexible-yet-consistent experience.