If I spin up a
Simply put, with Python3.5 and 3.6, the
max_workers argument is not allowed to be
0 for sanity reasons. So my solution was to make a more "mocky" version of the ThreadPoolExecutor that would record the activity of the ThreadPool in case something is added to the queue and then make assertions about that. I'll share the code here in case someone wants to reuse it for their purposes.
import threading from concurrent import futures class RecordingThreadPool(futures.Executor): """A thread pool that records if used.""" def __init__(self, max_workers): self._tp_executor = futures.ThreadPoolExecutor(max_workers=max_workers) self._lock = threading.Lock() self._was_used = False def submit(self, fn, *args, **kwargs): with self._lock: self._was_used = True self._tp_executor.submit(fn, *args, **kwargs) def was_used(self): with self._lock: return self._was_used