Jeff Jeff - 2 months ago 5x
Python Question

how to throttle a large number of task with out using all workers

Imagine I have a dask grid with 10 workers & 40 cores totals. This is a shared grid, so I don't want to fully saturate it with my work. I have 1000 tasks to do, and I want to submit (and have actively running) a maximum of 20 tasks at a time.

To be concrete,

from time import sleep
from random import random

def inc(x):
from random import random
sleep(random() * 2)
return x + 1

def double(x):
from random import random
return 2 * x

>>> from distributed import Executor
>>> e = Executor('')
>>> e
<Executor: scheduler= workers=10 threads=40>

If I setup a system of Queues

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q =, remote_q)
>>> double_q =, inc_q)

This will work, BUT, this will just dump ALL of my tasks to the grid, saturating it. Ideally I could:

e.scatter(input_q, max_submit=20)

It seems that the example from the docs here would allow me to use a
queue. But that looks like from a user-perspective I would still have to deal with the backpressure. Ideally
would automatically take care of this.


Use maxsize=

You're very close. All of scatter, gather, and map take the same maxsize= keyword argument that Queue takes. So a simple workflow might be as follows:


from time import sleep

def inc(x):
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:

from dask.distributed import Executor
e = Executor('')        # Connect to cluster

futures =, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

All of q, futures, and results are Python Queue objects. The q and results queues don't have a limit, so they'll greedily pull in as much as they can. The futures queue however has a maximum size of 20, so it will only allow 20 futures in flight at any given time. Once the leading future is complete it will immediately be consumed by the gather function and its result will be placed into the results queue. This frees up space in futures and causes another task to be submitted.

Note that this isn't exactly what you wanted. These queues are ordered so futures will only get popped off when they're in the front of the queue. If all of the in-flight futures have finished except for the first they'll still stay in the queue, taking up space. Given this constraint you might want to choose a maxsize= slightly more than your desired 20 items.

Extending this

Here we do a simple map->gather pipeline with no logic in between. You could also put other map computations in here or even pull futures out of the queues and do custom work with them on your own. It's easy to break out of the mold provided above.