zell zell - 7 months ago 100
Python Question

Terminate a Python multiprocessing program once a one of its workers meets a certain condition

I am writing a Python program using its multiprocessing module. The program calls a number of worker functions, each yielding a random number. I need to terminate the program once one of the workers has produced a number larger than 0.7.

Below is my program where the "how to do this" part is not yet filled out. Any idea? Thanks.

import time
import numpy as np
import multiprocessing as mp
import time
import sys

def f(i):
np.random.seed(int(time.time()+i))

time.sleep(3)
res=np.random.rand()
print "From i = ",i, " res = ",res
if res>0.7:
print "find it"
# terminate ???? Question: How to do this???


if __name__=='__main__':
num_workers=mp.cpu_count()
pool=mp.Pool(num_workers)
for i in range(num_workers):
p=mp.Process(target=f,args=(i,))
p.start()

Answer

As one of the other users mentioned, you need the processes to communicate with each other in order to get them to terminate their peers. While you can use os.kill to terminate the peer processes, it is more graceful to signal a termination.

The solution I used is a pretty simple one: 1. find out the process ID (pid) of the main process, which spawns all the other worker processes. This connection information is available from the OS, which keeps track which child process was spawned from which parent process. 2. when one of the worker processes reaches your end condition, it uses the parent process ID to find all the child processes of the main process (including itself), then goes through the list and signals them to end (making sure it is not signaling itself) The code below contains the working solution.

import time
import numpy as np
import multiprocessing as mp
import time
import sys
import os
import psutil
import signal

pid_array = []

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    current_process = os.getpid()
    print "From i = ",i, "       res = ",res, " with process ID (pid) = ", current_process
    if res>0.7:
        print "find it"
        # solution: use the parent child connection between processes
        parent = psutil.Process(main_process)
        children = parent.children(recursive=True)
        for process in children:
            if not (process.pid == current_process):
                print "Process: ",current_process,  " killed process: ", process.pid
                process.send_signal(signal.SIGTERM)


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    main_process = os.getpid()
    print "Main process: ", main_process
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

The output gives a clear idea of what is happening:

Main process:  30249
From i =  0        res =  0.224609517693  with process ID (pid) =  30259
From i =  1        res =  0.470935062176  with process ID (pid) =  30260
From i =  2        res =  0.493680214732  with process ID (pid) =  30261
From i =  3        res =  0.342349294134  with process ID (pid) =  30262
From i =  4        res =  0.149124648092  with process ID (pid) =  30263
From i =  5        res =  0.0134122107375  with process ID (pid) =  30264
From i =  6        res =  0.719062852901  with process ID (pid) =  30265
find it
From i =  7        res =  0.663682945388  with process ID (pid) =  30266
Process:  30265  killed process:  30259
Process:  30265  killed process:  30260
Process:  30265  killed process:  30261
Process:  30265  killed process:  30262
Process:  30265  killed process:  30263
Process:  30265  killed process:  30264
Process:  30265  killed process:  30266