This work was done in collaboration with Matthew Rocklin (Anaconda), Jim Edwards (NCAR), Guillaume Eynard-Bontemps (CNES), and Loïc Estève (INRIA), and is supported, in part, by the US National Science Foundation Earth Cube program. The dask-jobqueue package is a spinoff of the Pangeo Project. This blogpost was previously published here
TLDR; Dask-jobqueue allows you to seamlessly deploy dask on HPC clusters that use a variety of job queuing systems such as PBS, Slurm, SGE, or LSF. Dask-jobqueue provides a Pythonic user interface that manages dask workers/clusters through the submission, execution, and deletion of individual jobs on a HPC system. It gives users the ability to interactively scale workloads across large HPC systems; turning an interactive Jupyter Notebook into a powerful tool for scalable computation on very large datasets.
conda install -c conda-forge dask-jobqueue
pip install dask-jobqueue
And checkout the dask-jobqueue documentation: http://jobqueue.dask.org
Large high-performance computer (HPC) clusters are ubiquitous throughout the computational sciences. These HPC systems include powerful hardware, including many large compute nodes, high-speed interconnects, and parallel file systems. An example of such systems that we use at NCAR is named Cheyenne. Cheyenne is a fairly large machine, with about 150k cores and over 300 TB of total memory.
Cheyenne is a 5.34-petaflops, high-performance computer operated by NCAR.
These systems frequently use a job queueing system, such as PBS, Slurm, or SGE, to manage the queueing and execution of many concurrent jobs from numerous users. A “job” is a single execution of a program that is to be run on some set of resources on the user’s HPC system. These jobs are often submitted via the command line:
Where do_thing_a.sh is a shell script that might look something like this:
#PBS -N thing_a
#PBS -q premium
#PBS -A 123456789
#PBS -l select=1:ncpus=36:mem=109G
echo “doing thing A”
In this example “-N” specifies the name of this job, “-q” specifies the queue where the job should be run, “-A” specifies a project code to bill for the CPU time used while the job is run, and “-l” specifies the hardware specifications for this job. Each job queueing system has slightly different syntax for configuring and submitting these jobs.
This interface has led to the development of a few common workflow patterns:
None of the workflow patterns listed above allow for interactive analysis on very large data analysis. When I’m prototyping new processing method, I often want to work interactively, say in a Jupyter Notebook. Writing MPI code on the fly is hard and expensive, batch jobs are inherently not interactive, and serial just won’t do when I start working on many TBs of data. Our experience is that these workflows tend to be fairly inelegant and difficult to transfer between applications, yielding lots of duplicated effort along the way.
One of the aims of the Pangeo project is to facilitate interactive data on very large datasets. Pangeo leverages Jupyter and dask, along with a number of more domain specific packages like xarray to make this possible. The problem is we didn’t have a particularly palatable method for deploying dask on our HPC clusters.
from dask_jobqueue import PBSCluster
from dask.distributed import Client
cluster = PBSCluster(cores=36,
client = Client(cluster)
Dask-jobqueue is easily customizable to help users capitalize on advanced HPC features. A more complicated example that would work on NCAR’s Cheyenne super computer is:
cluster = PBSCluster(cores=36,
In this example, we instruct the PBSCluster to 1) use up to 36 cores per job, 2) use 18 worker processes per job, 3) use the large memory nodes with 109 GB each, 4) use a longer walltime than is standard, 5) use the InfiniBand network interface (ib0), and 6) use the fast SSD disks as its local directory space.
Finally, Dask offers the ability to “autoscale” clusters based on a set of heuristics. When the cluster needs more CPU or memory, it will scale up. When the cluster has unused resources, it will scale down. Dask-jobqueue supports this with a simple interface:
In this example, we tell our cluster to autoscale between 18 and 360 workers (or 1 and 20 jobs).
We have put together a fairly comprehensive screen cast that walks users through all the steps of setting up Jupyter and Dask (and dask-jobqueue) on an HPC cluster:
Dask jobqueue makes it much easier to deploy Dask on HPC clusters. The package provides a Pythonic interface to common job-queueing systems. It is also easily customizable.
The autoscaling functionality allows for a fundamentally different way to do science on HPC clusters. Start your Jupyter Notebook, instantiate your dask cluster, and then do science — let dask determine when to scale up and down depending on the computational demand. We think this bursting approach to interactive parallel computing offers many benefits.
Finally, in developing dask-jobqueue, we’ve run into a few challenges that are worth mentioning.