Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jan 13, 2019

Dask, Pandas, and GPUs: first steps

By

Executive Summary

We’re building a distributed GPU Pandas dataframe out ofcuDF andDask Dataframe.This effort is young.

This post describes the current situation,our general approach,and gives examples of what does and doesn’t work today.We end with some notes on scaling performance.

You can also view the experiment in this post asa notebook.

And here is a table of results:

Architecture Time Bandwidth Single CPU Core 3min 14s 50 MB/s Eight CPU Cores 58s 170 MB/s Forty CPU Cores 35s 285 MB/s One GPU 11s 900 MB/s Eight GPUs 5s 2000 MB/s

Building Blocks: cuDF and Dask

Building a distributed GPU-backed dataframe is a large endeavor.Fortunately we’re starting on a good foundation andcan assemble much of this system from existing components:

  1. The cuDF library aims to implement thePandas API on the GPU. It gets good speedups on standard operations likereading CSV files, filtering and aggregating columns, joins, and so on.
  2. import cudf # looks and feels like Pandas, but runs on the GPU

    df = cudf.read_csv('myfile.csv')
    df = df[df.name == 'Alice']
    df.groupby('id').value.mean()
  3. cuDF is part of the growing RAPIDS initiative.
  4. The Dask Dataframelibrary provides parallel algorithms around the Pandas API. It composeslarge operations like distributed groupbys or distributed joins from a taskgraph of many smaller single-node groupbys or joins accordingly (and manyother operations).
  5. import dask.dataframe as dd # looks and feels like Pandas, but runs in parallel

    df = dd.read_csv('myfile.*.csv')
    df = df[df.name == 'Alice']
    df.groupby('id').value.mean().compute()
  6. The Dask distributed task schedulerprovides general-purpose parallel execution given complex task graphs.It’s good for adding multi-node computing into an existing codebase.

Given these building blocks,our approach is to make the cuDF API close enough to Pandas thatwe can reuse the Dask Dataframe algorithms.

Benefits and Challenges to this approach

This approach has a few benefits:

  1. We get to reuse the parallel algorithms found in Dask Dataframe originally designed for Pandas.
  2. It consolidates the development effort within a single codebase so thatfuture effort spent on CPU Dataframes benefits GPU Dataframes and viceversa. Maintenance costs are shared.
  3. By building code that works equally with two DataFrame implementations(CPU and GPU) we establish conventions and protocols that willmake it easier for other projects to do the same, either with these twoPandas-like libraries, or with future Pandas-like libraries.
  4. This approach also aims to demonstrate that the ecosystem should support Pandas-likelibraries, rather than just Pandas. For example, if(when?) the Arrow library develops a computational system then we’ll be ina better place to roll that in as well.
  5. When doing any refactor we tend to clean up existing code.
  6. For example, to make dask dataframe ready for a new GPU Parquet readerwe end up refactoring and simplifying our Parquet I/O logic.

The approach also has some drawbacks. Namely, it places API pressure on cuDF to match Pandas so:

  1. Slight differences in API now cause larger problems, such as these:
  2. Join column ordering differs rapidsai/cudf #251
  3. Groupby aggregation column ordering differs rapidsai/cudf #483
  4. cuDF has some pressure on it to repeat what some believe to be mistakes inthe Pandas API.
  5. For example, cuDF today supports missing values arguably more sensibly thanPandas. Should cuDF have to revert to the old way of doing thingsjust to match Pandas semantics? Dask Dataframe will probably needto be more flexible in order to handle evolution and small differencesin semantics.

Alternatives

We could also write a new dask-dataframe-style project around cuDF that deviatesfrom the Pandas/Dask Dataframe API. Until recently thishas actually been the approach, and thedask-cudf project did exactly this.This was probably a good choice early on to get started and prototype things.The project was able to implement a wide range of functionality includinggroupby-aggregations, joins, and so on usingdask delayed.

We’re redoing this now on top of dask dataframe though, which means that we’relosing some functionality that dask-cudf already had, but hopefully thefunctionality that we add now will be more stable and established on a firmerbase.

Status Today

Today very little works, but what does is decently smooth.

Here is a simple example that reads some data from many CSV files,picks out a column,and does some aggregations.

from dask_cuda import LocalCUDACluster
import dask_cudf
from dask.distributed import Client

cluster = LocalCUDACluster() # runs on eight local GPUs
client = Client(cluster)

gdf = dask_cudf.read_csv('data/nyc/many/*.csv') # wrap around many CSV files

>>> gdf.passenger_count.sum().compute()
184464740

Also note, NYC Taxi ridership is significantly less than it was a few years ago

What I’m excited about in the example above

  • All of the infrastructure surrounding the cuDF code, like the cluster setup,diagnostics, JupyterLab environment, and so on, came for free, like anyother new Dask project.
  • Here is an image of my JupyterLab setup
Dask + CUDA + cuDF JupyterLab environment
  • Our df object is actually just a normal Dask Dataframe. We didn’t have towrite new __repr__, __add__, or .sum() implementations, and probablymany functions we didn’t think about work well today (though also manydon’t).
  • We’re tightly integrated and more connected to other systems. For example, ifwe wanted to convert our dask-cudf-dataframe to a dask-pandas-dataframe thenwe would just use the cuDF to_pandas function:
  • df = df.map_partitions(cudf.DataFrame.to_pandas)
  • We don’t have to write anything special like a separate .to_dask_dataframemethod or handle other special cases.
  • Dask parallelism is orthogonal to the choice of CPU or GPU.
  • It’s easy to switch hardware. By avoiding separate dask-cudf code pathsit’s easier to add cuDF to an existing Dask+Pandas codebase to run on GPUs,or to remove cuDF and use Pandas if we want our code to be runnable without GPUs.
  • There are more examples of this in the scaling section below.

What’s wrong with the example above

In general the answer is many small things.

  1. The cudf.read_csv function doesn’t yet support reading chunks from asingle CSV file, and so doesn’t work well with very large CSV files. Wehad to split our large CSV files into many smaller CSV files first withnormal Dask+Pandas:
  2. import dask.dataframe as dd
    (df = dd.read_csv('few-large/*.csv')
    .repartition(npartitions=100)
    .to_csv('many-small/*.csv', index=False))
  3. (See rapidsai/cudf #568)
  4. Many operations that used to work in dask-cudf like groupby-aggregationsand joins no longer work. We’re going to need to slightly modify many cuDFAPIs over the next couple of months to more closely match their Pandasequivalents.
  5. I ran the timing cell twice because it currently takes a few seconds toimport cudf today.rapidsai/cudf #627
  6. We had to make Dask Dataframe a bit more flexible and assume less about itsconstituent dataframes being exactly Pandas dataframes. (seedask/dask #4359 anddask/dask #4375 for examples).I suspect that there will by many more small changes likethese necessary in the future.

These problems are representative of dozens more similar issues. They areall fixable and indeed, many are actively being fixed today by the good folksworking on RAPIDS.

Near Term Schedule

The RAPIDS group is currently busy working to release 0.5, which includes someof the fixes necessary to run the example above, and also many unrelatedstability improvements. This will probably keep them busy for a week or twoduring which I don’t expect to see much Dask + cuDF work going on other thanplanning.

After that, Dask parallelism support will be a top priority, soI look forward to seeing some rapid progress here.

Scaling Results

In my last post about combining Dask Array with CuPy,a GPU-accelerated Numpy,we saw impressive speedups from using many GPUs on a simple problem thatmanipulated some simple random data.

Dask Array + CuPy on Random Data

Architecture Time Single CPU Core 2hr 39min Forty CPU Cores 11min 30s One GPU 1 min 37s Eight GPUs 19s

That exercise was easy to scale because it was almost entirely bound by thecomputation of creating random data.

Dask DataFrame + cuDF on CSV data

We did a similar study on the read_csv example above, which is bound mostlyby reading CSV data from disk and then parsing it. You can see a notebookavailablehere. Wehave similar (though less impressive) numbers to present.

Architecture Time Bandwidth Single CPU Core 3min 14s 50 MB/s Eight CPU Cores 58s 170 MB/s Forty CPU Cores 35s 285 MB/s One GPU 11s 900 MB/s Eight GPUs 5s 2000 MB/s

The bandwidth numbers were computed by noting that the data was around 10 GB on disk

Analysis

First, I want to emphasize again that it’s easy to test a wide variety ofarchitectures using this setup because of the Pandas API compatibility betweenall of the different projects. We’re seeing a wide range of performance (40xspan) across a variety of different hardware with a wide range of cost points.

Second, note that this problem scales less well than ourprevious example with CuPy,both on CPU and GPU.I suspect that this is because this example is also bound by I/O and not justnumber-crunching. While the jump from single-CPU to single-GPU is large, thejump from single-CPU to many-CPU or single-GPU to many-GPU is not as large aswe would have liked. For GPUs for example we got around a 2x speedup when weadded 8x as many GPUs.

At first one might think that this is because we’re saturating disk read speeds.However two pieces of evidence go against that guess:

  • NVIDIA folks familiar with my current hardware inform me that they’re able to getmuch higher I/O throughput when they’re careful
  • The CPU scaling is similarly poor, despite the fact that it’s obviously notreaching full I/O bandwidth

Instead, it’s likely that we’re just not treating our disks and IO pipelinescarefully.

We might consider working to think more carefully about data locality within asingle machine. Alternatively, we might just choose to use a smaller machine,or many smaller machines. My team has been asking me to start playing withsome cheaper systems than a DGX, I may experiment with those soon. It may bethat for data-loading and pre-processing workloads the previous wisdom of “packas much computation as you can into a single box” no longer holds(without us doing more work that is).

Come help

If the work above sounds interesting to you then come help!There is a lot of low-hanging and high impact work to do.

If you’re interested in being paid to focus more on these topics, then considerapplying for a job. NVIDIA’s RAPIDS team is looking to hire engineers for Daskdevelopment with GPUs and other data analytics library development projects.