xApple xApple - 7 months ago 34
Python Question

Python thread pool that handles exceptions

I've been looking around a good implementation of a simple python thread pool pattern and really can't find anything that suits my needs. I'm using python 2.7 and all the modules I have found either don't work, or don't handle exceptions in the workers properly. I was wondering if someone knew of a library that could offer the type of functionality I'm searching for. Help greatly appreciated.

Multiprocessing



My first attempt was with the built-in
multiprocessing
module, but as this doesn't use threads but subprocesses instead we run into the problem that objects cannot be pickled. No go here.

from multiprocessing import Pool

class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed


Futures



So I see there is a back port of some of the cool concurrent features of python 3.2 here. This seems perfect and simple to use. The problem is that when you get an exception in one of the workers, you only get the type of the exception such as "ZeroDivisionError" but no traceback and thus no indication of which line caused the exception. Code becomes impossible to debug. No go.

from concurrent import futures

class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
1/0
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib


# futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
# 354 def __get_result(self):
# 355 if self._exception:
#--> 356 raise self._exception
# 357 else:
# 358 return self._result
#
# ZeroDivisionError: integer division or modulo by zero


Workerpool



I found an other implementation of this pattern here. This time when an exception occurs it is printed, but then my ipython interactive interpreter is left in a hanging state and needs to be killed from an other shell. No go.

import workerpool

class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
1/0
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783


Threadpool



Yet an other implementation here. This time when an exception occurs, it is printed to the
stderr
but the script is not interrupted and instead continues executing, which defies the purpose of the exception and can make things unsafe. Still not usable.

import threadpool

class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5) / 2
1/0
self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'


- Update -



It appears that concerning the
futures
library, the behavior of python 3 is not the same as python 2.

futures_exceptions.py
:

from concurrent.futures import ThreadPoolExecutor, as_completed

def div_zero(x):
return x / 0

with ThreadPoolExecutor(max_workers=4) as executor:
futures = executor.map(div_zero, range(4))
for future in as_completed(futures): print(future)


Python 2.7.6 output:

Traceback (most recent call last):
File "...futures_exceptions.py", line 12, in <module>
for future in as_completed(futures):
File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
with _AcquireFutures(fs):
File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
self.futures = sorted(futures, key=id)
File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
yield future.result()
File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
return self.__get_result()
File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
raise self._exception
ZeroDivisionError: integer division or modulo by zero


Python 3.3.2 output:

Traceback (most recent call last):
File "...futures_exceptions.py", line 11, in <module>
for future in as_completed(futures):
File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
with _AcquireFutures(fs):
File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
self.futures = sorted(futures, key=id)
File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
yield future.result()
File "...python3.3/concurrent/futures/_base.py", line 392, in result
return self.__get_result()
File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
raise self._exception
File "...python3.3/concurrent/futures/thread.py", line 54, in run
result = self.fn(*self.args, **self.kwargs)
File "...futures_exceptions.py", line 7, in div_zero
return x / 0
ZeroDivisionError: division by zero

Answer

I personally use Futures as the interface is very simple. For the traceback issue I found a workaround to preserve it. Checkout my answer to this other question:

How to get correct line number where exception was thrown using concurrent.futures (python 2.7)