SRD SRD - 3 months ago 14
Python Question

Stopping processes in ThreadPool in Python

I've been trying to write an interactive wrapper (for use in ipython) for a library that controls some hardware. Some calls are heavy on the IO so it makes sense to carry out the tasks in parallel. Using a ThreadPool (almost) works nicely:

from multiprocessing.pool import ThreadPool

class hardware():
def __init__(IPaddress):
connect_to_hardware(IPaddress)

def some_long_task_to_hardware(wtime):
wait(wtime)
result = 'blah'
return result

pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
threads.append(task)
alive = [True]*4
Try:
while any(alive) :
for tt in range(4): alive[tt] = not threads[tt].ready()
do_other_stuff_for_a_bit()
except:
#some command I cannot find that will stop the threads...
raise
for tt in range(4): print(threads[tt].get())


The problem comes if the user wants to stop the process or there is an IO error in
do_other_stuff_for_a_bit()
. Pressing Ctrl+C stops the main process but the worker threads carry on running until their current task is complete.

Is there some way to stop these threads without having to rewrite the library or have the user exit python?
pool.terminate()
and
pool.join()
that I have seen used in other examples do not seem to do the job.

The actual routine (instead of the simplified version above) uses logging and although all the worker threads are shut down at some point, I can see the processes that they started running carry on until complete (and being hardware I can see their effect by looking across the room).

This is in python 2.7.

UPDATE:

The solution seems to be to switch to using multiprocessing.Process instead of a thread pool. The test code I tried is to run foo_pulse:

class foo(object):
def foo_pulse(self,nPulse,name): #just one method of *many*
print('starting pulse for '+name)
result=[]
for ii in range(nPulse):
print('on for '+name)
time.sleep(2)
print('off for '+name)
time.sleep(2)
result.append(ii)
return result,name


If you try running this using ThreadPool then ctrl-C does not stop foo_pulse from running (even though it does kill the threads right away, the print statements keep on coming:

from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
a=foo()
pool=ThreadPool(processes=4)
threads=[]
for rn in range(4) :
r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
threads.append(r)
alive=[True]*4
try:
while any(alive) : #wait until all threads complete
for rn in range(4):
alive[rn] = not threads[rn].ready()
time.sleep(1)
except : #stop threads if user presses ctrl-c
print('trying to stop threads')
pool.terminate()
print('stopped threads') # this line prints but output from foo_pulse carried on.
raise
else :
for t in threads : print(t.get())


However a version using multiprocessing.Process works as expected:

import multiprocessing as mp
import time
def test_pro(nPulse):
pros=[]
ans=[]
a=foo()
for rn in range(4) :
q=mp.Queue()
ans.append(q)
r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
r.start()
pros.append(r)
try:
for p in pros : p.join()
print('all done')
except : #stop threads if user stops findRes
print('trying to stop threads')
for p in pros : p.terminate()
print('stopped threads')
else :
print('output here')
for q in ans :
print(q.get())
print('exit time')


Where I have defined a wrapper for the library foo (so that it did not need to be re-written). If the return value is not needed the neither is this wrapper :

def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))


From the documentation I see no reason why a pool would not work (other than a bug).

Answer

This is a very interesting use of parallelism.

However, if you are using multiprocessing, the goal is to have many processes running in parallel, as opposed to one process running many threads.

Consider these few changes to implement it using multiprocessing:

You have these functions that will run in parallel:

import time
import multiprocessing as mp


def some_long_task_from_library(wtime):
    time.sleep(wtime)


class MyException(Exception): pass

def do_other_stuff_for_a_bit():
    time.sleep(5)
    raise MyException("Something Happened...")

Let's create and start the processes, say 4:

procs = []  # this is not a Pool, it is just a way to handle the
            # processes instead of calling them p1, p2, p3, p4...
for _ in range(4):
    p = mp.Process(target=some_long_task_from_library, args=(1000,))
    p.start()
    procs.append(p)
mp.active_children()   # this joins all the started processes, and runs them.

The processes are running in parallel, presumably in a separate cpu core, but that is to the OS to decide. You can check in your system monitor.

In the meantime you run a process that will break, and you want to stop the running processes, not leaving them orphan:

try:
    do_other_stuff_for_a_bit()
except MyException as exc:
    print(exc)
    print("Now stopping all processes...")
    for p in procs:
        p.terminate()
print("The rest of the process will continue")

If it doesn't make sense to continue with the main process when one or all of the subprocesses have terminated, you should handle the exit of the main program.

Hope it helps, and you can adapt bits of this for your library.