FriendlyGuy FriendlyGuy - 5 months ago 15
Python Question

Python's multiprocessing returns more results than tasks where given

I'm currently trying to use multiprocessing for my simulation run, to evaluate different input values at the same time.

Therefore, I googled a lot in the last weeks and got something together which is probably not very pretty but it (somehow) works. My Problem is now, that it returns more output than I have given it tasks to do and I don't understand why.

Sometimes each simulation run returns only one value as expected but as in the example below I would expect the result of e.g. simulation run 5 to be only [23]. It also differs, which simulation run produces more output then expected. When I increase the number of periods to e.g. 2, it would generate 4 output values but I cannot figure out why that is.

Could please somebody give me a hint how I could change that? I cannot find an answer to that and I'm getting quite frustrated :(
Also any suggestions on how I could improve my code would be really appreciated as I'm quite new to python and I love it so far :)

This is the simplified code I use:

import numpy as np
from multiprocessing import Process, Queue
import multiprocessing
from itertools import repeat

class Simulation(Process):
Nr = 1
Mean = 5
StdDev = 3
Periods = 10
Result = []

def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue

def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result

def worker(Mean, stdDev, Periods, Nr, queue):
Sim = Simulation()
Sim.Nr = Nr
Sim.Periods = Periods
Sim.Mean = Mean
Sim.StdDev = stdDev
Results = Sim.runSimulation()
queue.put(Results)
print("Simulation run " + str(Nr) + " done with a result of " + str(Results)
+ " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")")

if __name__ == '__main__':
m = multiprocessing.Manager()
queue = m.Queue()
CPUS = multiprocessing.cpu_count() # CPUS = 8
WORKERS = multiprocessing.Pool(processes=CPUS)

Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 1
Nr = list(range(1,len(Mean) + 1))

WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
WORKERS.close()
WORKERS.join()

FinalSimulationResults = []
for i in range(len(Mean)):
FinalSimulationResults.append(queue.get())
print(FinalSimulationResults)


Which results in e.g. this:

Simulation run 1 done with a result of [23] (Input: mean: 50, std. dev.: 10)
Simulation run 2 done with a result of [55] (Input: mean: 60, std. dev.: 10)
Simulation run 3 done with a result of [64] (Input: mean: 70, std. dev.: 10)
Simulation run 5 done with a result of [23, 89] (Input: mean: 90, std. dev.: 10)
Simulation run 4 done with a result of [78] (Input: mean: 80, std. dev.: 10)
[[23], [55], [64], [23, 89], [78]]


It works now :). Not as fast as I expected (only 2 times faster with 8 cores) but for everyone who might has the same problem, here's my working code:

import numpy as np
from multiprocessing import Process, Queue
import multiprocessing
from itertools import repeat

class Simulation():
def __init__(self, Nr, Mean, Std_dev, Periods):
self.Result = []
self.Nr = Nr
self.Mean = Mean
self.StdDev = Std_dev
self.Periods = Periods

def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue

def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result

def worker(Mean, stdDev, Periods, Nr, queue):
Sim = Simulation(Nr=Nr,Mean=Mean,Std_dev=stdDev,Periods=Periods)
Results = Sim.runSimulation()
queue.put(Results)
print("Simulation run " + str(Nr) + " done with a result of " + str(Results)
+ " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")")

if __name__ == '__main__':
start = time.time()
m = multiprocessing.Manager()
queue = m.Queue()
CPUS = multiprocessing.cpu_count()
WORKERS = multiprocessing.Pool(processes=CPUS)

Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 100
Nr = list(range(1,len(Mean) + 1))

WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
WORKERS.close()
WORKERS.join()

FinalSimulationResults = []
for i in range(len(Mean)):
FinalSimulationResults.append(queue.get())

print(FinalSimulationResults)

Answer

The way you assign the attributes to the class makes the attributes class attributes. That way they are shared between every instance of the class. In your case this doesn't appear immideatly because in every process you only have one instance of the class and the class object itself is not shared between processes. Now if a worker is finished early enough that it can get another task the class object will be reused and the class attributes work "as expected".

To circumvent this you should always assign instance attributes (i.e. attributes that should be different from instance to instance) in the __init__ function:

class Simulation(Process):

    def __init__(self, nr, mean, std_dev, periods):
        self.nr = nr
        self.mean = mean
        self.std_dev = std_dev
        self.periods = periods
        self.result = []

    def Generate_Value(self):
        GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
        return GeneratedValue

    def runSimulation(self):
        for i in range(self.Periods):
            self.Result.append(self.Generate_Value())
        return self.Result

For further information see the documentation

That said I don't think you should use the Process class in the way you are using it. Pool automatically handles Process creating for you and you only need to tell it what to do. So rewriting your code:

def task(nr, mean, std_dev, periods, results):
    for i in range(periods):
        results.append(max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0))
    return results


m = multiprocessing.Manager()
queue = m.Queue()
cpu_count = multiprocessing.cpu_count() # CPUS = 8
pool = multiprocessing.Pool(processes=CPUS)

Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 1
Nr = list(range(1,len(Mean) + 1))

pool.starmap(task, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
pool.close()
pool.join()

should work (not tested).