I've got some Python code that farms out expensive jobs using ThreadPoolExecutor, and I'd like to keep track of which of them have completed so that if I have to restart this system, I don't have to redo the stuff that already finished. In a single-threaded context, I could just mark what I've done in a shelf. Here's a naive port of that idea to a multithreaded environment:
from concurrent.futures import ThreadPoolExecutor
def do_thing(done, x):
# Don't let the command run in the background; we want to be able to tell when it's done
_ = subprocess.check_output(["some_expensive_command", x])
done[x] = True
futs = 
with shelve.open("done") as done:
with ThreadPoolExecutor(max_workers=18) as executor:
for x in things_to_do:
if done.get(x, False):
futs.append(executor.submit(do_thing, done, x))
# Can't run `done[x] = True` here--have to wait until do_thing finishes
for future in futs:
# Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
# before we get through all of things_to_do
done[x] = True
While you're still in the outer-most
with context manager, the
done shelve is just a normal python object- it is only written to disk when the context manager closes and it runs its
__exit__ method. It is therefore just as thread safe as any other python object, due to the GIL (as long as you're using CPython).
Specifically, the reassignment
done[x] = True is thread safe / will be done atomically.
It's important to note that while the shelve's
__exit__ method will run after a Ctrl-C, it won't if the python process ends abruptly, and the shelve won't be saved to disk.
To protect against this kind of failure, I would suggest using a lightweight file-based thread safe database like sqllite3.