Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jan 29, 2019

Single-Node Multi-GPU Dataframe Joins

By

Summary

We experiment with single-node multi-GPU joins using cuDF and Dask. We findthat the in-GPU computation is faster than communication. We also presentcontext and plans for near-future work, including improving high performancecommunication in Dask with UCX.

Here is a notebook of the experiment in this post

Introduction

In a recent post we showed how Dask + cuDF could accelerate reading CSV filesusing multiple GPUs in parallel. That operation quickly became bound by thespeed of our disk after we added a few GPUs. Now we try a very different kindof operation, multi-GPU joins.

This workload can be communication-heavy, especially if the column on which weare joining is not sorted nicely, and so provides a good example on the otherextreme from parsing CSV.

Benchmark

Construct random data using the CPU

Here we use Dask array and Dask dataframe to construct two random tables with ashared id column. We can play with the number of rows of each table and thenumber of keys to make the join challenging in a variety of ways.

import dask.array as da
import dask.dataframe as dd

n_rows = 1000000000
n_keys = 5000000

left = dd.concat([
da.random.random(n_rows).to_dask_dataframe(columns='x'),
da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1)

n_rows = 10000000

right = dd.concat([
da.random.random(n_rows).to_dask_dataframe(columns='y'),
da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1)

Send to the GPUs

We have two Dask dataframes composed of many Pandas dataframes of our randomdata. We now map the cudf.from_pandas function across these to make a Daskdataframe of cuDF dataframes.

import dask
import cudf

gleft = left.map_partitions(cudf.from_pandas)
gright = right.map_partitions(cudf.from_pandas)

gleft, gright = dask.persist(gleft, gright) # persist data in device memory

What’s nice here is that there wasn’t any specialdask_pandas_dataframe_to_dask_cudf_dataframe function. Dask composed nicelywith cuDF. We didn’t need to do anything special to support it.

We’ll also persisted the data in device memory.

After this, simple operations are easy and fast and use our eight GPUs.

>>> gleft.x.sum().compute() # this takes 250ms
500004719.254711

Join

We’ll use standard Pandas syntax to merge the datasets, persist the result inRAM, and then wait

out = gleft.merge(gright, on=['id']) # this is lazy

Profile and analyze results

We now look at the Dask diagnostic plots for this computation.

Task stream and communication

When we look at Dask’s task stream plot we see that each of our eight threads(each of which manages a single GPU) spent most of its time in communication(red is communication time). The actual merge and concat tasks are quite fastrelative to the data transfer time.

That’s not too surprising. For this computation I’ve turned off any attempt tocommunicate between devices (more on this below) so the data is being movedfrom the GPU to the CPU memory, then serialized and put onto a TCP socket.We’re moving tens of GB on a single machine, so we’re seeing about 1GB/s totalthroughput of the system, which is typical for TCP-on-localhost in Python.

Flamegraph of computation

We can also look more deeply at the computational costs in Dask’sflamegraph-style plot. This shows which lines of our functions were taking upthe most time (down to the Python level at least).

This Flame graph shows whichlines of cudf code we spent time on while computing (excluding the maincommunication costs mentioned above). It may be interesting for those tryingto further optimize performance. It shows that most of our costs are in memoryallocation. Like communication, this has actually also been fixed in RAPIDS’optional memory management pool, it just isn’t default yet, so I didn’t use ithere.

Plans for efficient communication

The cuDF library actually has a decent approach to single-node multi-GPUcommunication that I’ve intentionally turned off for this experiment. Thatapproach cleverly used Dask to communicate device pointer information usingDask’s normal channels (this is small and fast) and then used that informationto initiate a side-channel communication for the bulk of the data. Thisapproach was effective, but somewhat fragile. I’m inclined to move on for itin favor of …

UCX. The UCX project provides a single API thatwraps around several transports like TCP, Infiniband, shared memory, and alsoGPU-specific transports. UCX claims to find the best way to communicate databetween two points given the hardware available. If Dask were able to use thisfor communication then it would provide both efficient GPU-to-GPU communicationon a single machine, and also efficient inter-machine communication whenefficient networking hardware like Infiniband was present, even outside thecontext of GPUs.

There is some work we need to do here:

  1. We need to make a Python wrapper around UCX
  2. We need to make an optional Dask Commaround this ucx-py library that allows users to specify endpoints likeucx://path-to-scheduler
  3. We need to make Python memoryview-like objects that refer to device memory

This work is already in progress by AkshayVekatesh, who works on UCX, and TomAugspurger a core Dask/Pandas developer. Isuspect that they’ll write about it soon. I’m looking forward to seeing whatcomes of it, both for Dask and for high performance Python generally.

It’s worth pointing out that this effort won’t just help GPU users. It shouldhelp anyone on advanced networking hardware, including the mainstreamscientific HPC community.

Summary

Single-node Mutli-GPU joins have a lot of promise. In fact, earlier RAPIDSdevelopers got this running much faster than I was able to do above through theclever communication tricks I briefly mentioned. The main purpose of this postis to provide a benchmark for joins that we can use in the future, and tohighlight when communication can be essential in parallel computing.

Now that GPUs have accelerated the computation time of each of our chunks ofwork we increasingly find that other systems become the bottleneck. We didn’tcare as much about communication before because computational costs werecomparable. Now that computation is an order of magnitude cheaper, otheraspects of our stack become much more important.

I’m looking forward to seeing where this goes.

Come help!

If the work above sounds interesting to you then come help!There is a lot of low-hanging and high impact work to do.

If you’re interested in being paid to focus more on these topics, then considerapplying for a job. NVIDIA’s RAPIDS team is looking to hire engineers for Daskdevelopment with GPUs and other data analytics library development projects.