Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Sep 12, 2016

Dask Distributed Release 1.13.0

By

I’m pleased to announce a release ofDask’s distributed scheduler,dask.distributed, version1.13.0.

conda install dask distributed -c conda-forge
or
pip install dask distributed --upgrade

The last few months have seen a number of important user-facing features:

  • Executor is renamed to Client
  • Workers can spill excess data to disk when they run out of memory
  • The Client.compute and Client.persist methods for dealing with daskcollections (like dask.dataframe or dask.delayed) gain the ability torestrict sub-components of the computation to different parts of thecluster with a workers= keyword argument.
  • IPython kernels can be deployed on the worker and schedulers forinteractive debugging.
  • The Bokeh web interface has gained new plots and improve the visual stylingof old ones.

Additionally there are beta features in current development. These featuresare available now, but may change without warning in future versions.Experimentation and feedback by users comfortable with living on the bleedingedge is most welcome:

  • Clients can publish named datasets on the scheduler to share between them
  • Tasks can launch other tasks
  • Workers can restart themselves in new software environments provided by theuser

There have also been significant internal changes. Other than increasedperformance these changes should not be directly apparent.

  • The scheduler was refactored to a more state-machine like architecture.Doc page
  • Short-lived connections are now managed by a connection pool
  • Work stealing has changed and grown more responsive:Doc page
  • General resilience improvements

The rest of this post will contain very brief explanations of the topics above.Some of these topics may become blogposts of their own at some point. Untilthen I encourage people to look at the distributed scheduler’sdocumentation which is separatefrom dask’s normal documentation andso may contain new information for some readers (Google Analytics reports about5-10x the readership onhttp://dask.readthedocs.org than onhttp://distributed.readthedocs.org.

Major Changes and Features

Rename Executor to Client

http://distributed.readthedocs.io/en/latest/api.html

The term Executor was originally chosen to coincide with theconcurrent.futuresExecutor interface, which is what defines the behavior for the .submit,.map, .result methods and Future object used as the primary interface.

Unfortunately, this is the same term used by projects like Spark and Mesos for“the low-level thing that executes tasks on each of the workers” causingsignificant confusion when communicating with other communities or fortransitioning users.

In response we rename Executor to a somewhat more generic term, Client todesignate its role as the thing users interact with to control theircomputations.

>>> from distributed import Executor # Old
>>> e = Executor() # Old

>>> from distributed import Client # New
>>> c = Client() # New

Executor remains an alias for Client and will continue to be valid for sometime, but there may be some backwards incompatible changes for internal use ofexecutor= keywords within methods. Newer examples and materials will all usethe term Client.

Workers Spill Excess Data to Disk

http://distributed.readthedocs.io/en/latest/worker.html#spill-excess-data-to-disk

When workers get close to running out of memory they can send excess data todisk. This is not on by default and instead requires adding the--memory-limit=auto option to dask-worker.

dask-worker scheduler:8786 # Old
dask-worker scheduler:8786 --memory-limit=auto # New

This will eventually become the default (and is now when usingLocalCluster)but we’d like to see how things progress and phase it in slowly.

Generally this feature should improve robustness and allow the solution oflarger problems on smaller clusters, although with a performance cost. Dask’spolicies to reduce memory use through clever scheduling remain in place, so inthe common case you should never need this feature, but it’s nice to have as afailsafe.

Enable restriction of valid workers for compute and persist methods

http://distributed.readthedocs.io/en/latest/locality.html#user-control

Expert users of the distributed scheduler will be aware of the ability torestrict certain tasks to run only on certain computers. This tends to beuseful when dealing with GPUs or with special databases or instruments onlyavailable on some machines.

Previously this option was available only on the submit, map, and scattermethods, forcing people to use the more immedate interface. Now the daskcollection interface functions compute and persist support this keyword aswell.

IPython Integration

http://distributed.readthedocs.io/en/latest/ipython.html

You can start IPython kernels on the workers or scheduler and then access themdirectly using either IPython magics or the QTConsole. This tends to bevaluable when things go wrong and you want to interactively debug on the workernodes themselves.

Start IPython on the Scheduler

>>> client.start_ipython_scheduler() # Start IPython kernel on the scheduler
>>> %scheduler scheduler.processing # Use IPython magics to inspect scheduler
{'127.0.0.1:3595': ['inc-1', 'inc-2'],
'127.0.0.1:53589': ['inc-2', 'add-5']}

Start IPython on the Workers

>>> info = e.start_ipython_workers() # Start IPython kernels on all workers
>>> list(info)
['127.0.0.1:4595', '127.0.0.1:53589']
>>> %remote info['127.0.0.1:3595'] worker.active # Use IPython magics
{'inc-1', 'inc-2'}

Bokeh Interface

http://distributed.readthedocs.io/en/latest/web.html

The Bokeh web interface to the cluster continues to evolve both by improvingexisting plots and by adding new plots and new pages.

dask progress bar

For example the progress bars have become more compact and shrink downdynamically to respond to addiional bars.

And we’ve added in extra tables and plots to monitor workers, such as theirmemory use and current backlog of tasks.

Experimental Features

The features described below are experimental and may change without warning.Please do not depend on them in stable code.

Publish Datasets

http://distributed.readthedocs.io/en/latest/publish.html

You can now save collections on the scheduler, allowing you to come back to thesame computations later or allow collaborators to see and work off of yourresults. This can be useful in the following cases:

  1. There is a dataset from which you frequently base all computations, and youwant that dataset always in memory and easy to access without having torecompute it each time you start work, even if you disconnect.
  2. You want to send results to a colleague working on the same Dask cluster andhave them get immediate access to your computations without having to sendthem a script and without them having to repeat the work on the cluster.

Example: Client One

from dask.distributed import Client
client = Client('scheduler-address:8786')

import dask.dataframe as dd
df = dd.read_csv('s3://my-bucket/*.csv')
df2 = df[df.balance < 0]
df2 = client.persist(df2)

>>> df2.head()
name balance
0 Alice -100
1 Bob -200
2 Charlie -300
3 Dennis -400
4 Edith -500

client.publish_dataset(accounts=df2)

Example: Client Two

>>> from dask.distributed import Client
>>> client = Client('scheduler-address:8786')

>>> client.list_datasets()
['accounts']

>>> df = client.get_dataset('accounts')
>>> df.head()
name balance
0 Alice -100
1 Bob -200
2 Charlie -300
3 Dennis -400
4 Edith -500

Launch Tasks from tasks

http://distributed.readthedocs.io/en/latest/task-launch.html

You can now submit tasks to the cluster that themselves submit more tasks.This allows the submission of highly dynamic workloads that can shapethemselves depending on future computed values without ever checking back inwith the original client.

This is accomplished by starting new local Clients within the task that caninteract with the scheduler.

def func():
from distributed import local_client
with local_client() as c2:
future = c2.submit(...)

c = Client(...)
future = c.submit(func)

There are a few straightforward use cases for this, like iterative algorithmswith stoping criteria, but also many novel use cases including streamingand monitoring systems.

Restart Workers in Redeployable Python Environments

You can now zip up and distribute full Conda environments, and askdask-workers to restart themselves, live, in that environment. This involvesthe following:

  1. Create a conda environment locally (or any redeployable directory includinga python executable)
  2. Zip up that environment and use the existing dask.distributed networkto copy it to all of the workers
  3. Shut down all of the workers and restart them within the new environment

This helps users to experiment with different software environments with a muchfaster turnaround time (typically tens of seconds) than asking IT to installlibraries or building and deploying Docker containers (which is also a finesolution). Note that they typical solution of uploading individual pythonscripts or egg files has been around for a while, see API docs forupload_file

Acknowledgements

Since version 1.12.0 on August 18th the following people have contributedcommits to the dask/distributed repository

  • Dave Hirschfeld
  • dsidi
  • Jim Crist
  • Joseph Crail
  • Loïc Estève
  • Martin Durant
  • Matthew Rocklin
  • Min RK
  • Scott Sievert