Siddharth Shukla Siddharth Shukla - 2 months ago 7x
Python Question

concurrent.futures.ThreadPoolExecutor max_workers can't be 0

If I spin up a

it works with Python3.4 and Python2.7 but raises an error with Python3.5 and Python3.6. I'm trying to create a
where I want to ensure that no task gets added to the threadpool. Currently, I created a subclass from
and raised and exception in the overloaded
method. Is there a better way to do this?


Simply put, with Python3.5 and 3.6, the max_workers argument is not allowed to be 0 for sanity reasons. So my solution was to make a more "mocky" version of the ThreadPoolExecutor that would record the activity of the ThreadPool in case something is added to the queue and then make assertions about that. I'll share the code here in case someone wants to reuse it for their purposes.

import threading
from concurrent import futures

class RecordingThreadPool(futures.Executor):
  """A thread pool that records if used."""
  def __init__(self, max_workers):
    self._tp_executor = futures.ThreadPoolExecutor(max_workers=max_workers)
    self._lock = threading.Lock()
    self._was_used = False

  def submit(self, fn, *args, **kwargs):
    with self._lock:
      self._was_used = True
    self._tp_executor.submit(fn, *args, **kwargs)

  def was_used(self):
    with self._lock:
      return self._was_used