I’m pleased to announce a release ofDask’s distributed scheduler,dask.distributed, version1.13.0.
conda install dask distributed -c conda-forge
or
pip install dask distributed --upgrade
The last few months have seen a number of important user-facing features:
Additionally there are beta features in current development. These featuresare available now, but may change without warning in future versions.Experimentation and feedback by users comfortable with living on the bleedingedge is most welcome:
There have also been significant internal changes. Other than increasedperformance these changes should not be directly apparent.
The rest of this post will contain very brief explanations of the topics above.Some of these topics may become blogposts of their own at some point. Untilthen I encourage people to look at the distributed scheduler’sdocumentation which is separatefrom dask’s normal documentation andso may contain new information for some readers (Google Analytics reports about5-10x the readership onhttp://dask.readthedocs.org than onhttp://distributed.readthedocs.org.
http://distributed.readthedocs.io/en/latest/api.html
The term Executor was originally chosen to coincide with theconcurrent.futuresExecutor interface, which is what defines the behavior for the .submit,.map, .result methods and Future object used as the primary interface.
Unfortunately, this is the same term used by projects like Spark and Mesos for“the low-level thing that executes tasks on each of the workers” causingsignificant confusion when communicating with other communities or fortransitioning users.
In response we rename Executor to a somewhat more generic term, Client todesignate its role as the thing users interact with to control theircomputations.
>>> from distributed import Executor # Old
>>> e = Executor() # Old
>>> from distributed import Client # New
>>> c = Client() # New
Executor remains an alias for Client and will continue to be valid for sometime, but there may be some backwards incompatible changes for internal use ofexecutor= keywords within methods. Newer examples and materials will all usethe term Client.
http://distributed.readthedocs.io/en/latest/worker.html#spill-excess-data-to-disk
When workers get close to running out of memory they can send excess data todisk. This is not on by default and instead requires adding the--memory-limit=auto option to dask-worker.
dask-worker scheduler:8786 # Old
dask-worker scheduler:8786 --memory-limit=auto # New
This will eventually become the default (and is now when usingLocalCluster)but we’d like to see how things progress and phase it in slowly.
Generally this feature should improve robustness and allow the solution oflarger problems on smaller clusters, although with a performance cost. Dask’spolicies to reduce memory use through clever scheduling remain in place, so inthe common case you should never need this feature, but it’s nice to have as afailsafe.
http://distributed.readthedocs.io/en/latest/locality.html#user-control
Expert users of the distributed scheduler will be aware of the ability torestrict certain tasks to run only on certain computers. This tends to beuseful when dealing with GPUs or with special databases or instruments onlyavailable on some machines.
Previously this option was available only on the submit, map, and scattermethods, forcing people to use the more immedate interface. Now the daskcollection interface functions compute and persist support this keyword aswell.
http://distributed.readthedocs.io/en/latest/ipython.html
You can start IPython kernels on the workers or scheduler and then access themdirectly using either IPython magics or the QTConsole. This tends to bevaluable when things go wrong and you want to interactively debug on the workernodes themselves.
Start IPython on the Scheduler
>>> client.start_ipython_scheduler() # Start IPython kernel on the scheduler
>>> %scheduler scheduler.processing # Use IPython magics to inspect scheduler
{'127.0.0.1:3595': ['inc-1', 'inc-2'],
'127.0.0.1:53589': ['inc-2', 'add-5']}
Start IPython on the Workers
>>> info = e.start_ipython_workers() # Start IPython kernels on all workers
>>> list(info)
['127.0.0.1:4595', '127.0.0.1:53589']
>>> %remote info['127.0.0.1:3595'] worker.active # Use IPython magics
{'inc-1', 'inc-2'}
http://distributed.readthedocs.io/en/latest/web.html
The Bokeh web interface to the cluster continues to evolve both by improvingexisting plots and by adding new plots and new pages.
For example the progress bars have become more compact and shrink downdynamically to respond to addiional bars.
And we’ve added in extra tables and plots to monitor workers, such as theirmemory use and current backlog of tasks.
The features described below are experimental and may change without warning.Please do not depend on them in stable code.
http://distributed.readthedocs.io/en/latest/publish.html
You can now save collections on the scheduler, allowing you to come back to thesame computations later or allow collaborators to see and work off of yourresults. This can be useful in the following cases:
Example: Client One
from dask.distributed import Client
client = Client('scheduler-address:8786')
import dask.dataframe as dd
df = dd.read_csv('s3://my-bucket/*.csv')
df2 = df[df.balance < 0]
df2 = client.persist(df2)
>>> df2.head()
name balance
0 Alice -100
1 Bob -200
2 Charlie -300
3 Dennis -400
4 Edith -500
client.publish_dataset(accounts=df2)
Example: Client Two
>>> from dask.distributed import Client
>>> client = Client('scheduler-address:8786')
>>> client.list_datasets()
['accounts']
>>> df = client.get_dataset('accounts')
>>> df.head()
name balance
0 Alice -100
1 Bob -200
2 Charlie -300
3 Dennis -400
4 Edith -500
http://distributed.readthedocs.io/en/latest/task-launch.html
You can now submit tasks to the cluster that themselves submit more tasks.This allows the submission of highly dynamic workloads that can shapethemselves depending on future computed values without ever checking back inwith the original client.
This is accomplished by starting new local Clients within the task that caninteract with the scheduler.
def func():
from distributed import local_client
with local_client() as c2:
future = c2.submit(...)
c = Client(...)
future = c.submit(func)
There are a few straightforward use cases for this, like iterative algorithmswith stoping criteria, but also many novel use cases including streamingand monitoring systems.
You can now zip up and distribute full Conda environments, and askdask-workers to restart themselves, live, in that environment. This involvesthe following:
This helps users to experiment with different software environments with a muchfaster turnaround time (typically tens of seconds) than asking IT to installlibraries or building and deploying Docker containers (which is also a finesolution). Note that they typical solution of uploading individual pythonscripts or egg files has been around for a while, see API docs forupload_file
Since version 1.12.0 on August 18th the following people have contributedcommits to the dask/distributed repository