user189035 user189035 - 1 month ago 10
Python Question

Multiprocessing: pool **only** the physical cores?

I have a function

foo
which consumes a lot of memory and which I would like to run several instances of in parallel.

Suppose I have a CPU with 4 physical cores, each with two logical cores.

My system has enough memory to accommodate 4 instances of
foo
in parallel but not 8.

So I want to run
foo
on all 4 physical cores. In other words, I would like to ensure that doing
multiprocessing.Pool(4)
(4 being the maximum number of concurrent run of the function I can accommodate on this machine due to memory limitations) dispatches the job to the four physical cores (and not, for example, to a combo of two physical cores and their two logical offsprings).

How to do that in python?

Edit:



I removed the ' and all' part of the question which seem to have confused some of the readers. The question is really about how to dispatch a task to the physical cores only.

Edit 2:



I earlier used a code example from
multiprocessing
but I am library agnostic ,so to avoid confusion, I removed that.

Answer

Note: This approach doesn't work on windows and it is tested only on linux.

Using multiprocessing.Process:

Assigning a physical core to each process is quite easy when using Process(). You can create a for loop that iterates trough each core and assigns the new process to the new core using taskset -p [mask] [pid] :

import multiprocessing
import os

def foo():
    return

if __name__ == "__main__" :
    for process_idx in range(multiprocessing.cpu_count()):
        p = multiprocessing.Process(target=foo)
        os.system("taskset -p -c %d %d" % (process_idx % multiprocessing.cpu_count(), os.getpid()))
        p.start()

I have 32 cores on my workstation so I'll put partial results here:

pid 520811's current affinity list: 0-31
pid 520811's new affinity list: 0
pid 520811's current affinity list: 0
pid 520811's new affinity list: 1
pid 520811's current affinity list: 1
pid 520811's new affinity list: 2
pid 520811's current affinity list: 2
pid 520811's new affinity list: 3
pid 520811's current affinity list: 3
pid 520811's new affinity list: 4
pid 520811's current affinity list: 4
pid 520811's new affinity list: 5
...

As you see, the previous and new affinity of each process here. The first one is for all cores (0-31) and is then assigned to core 0, second process is by default assigned to core0 and then its affinity is changed to the next core (1), and so forth.

Using multiprocessing.Pool:

Warning: This approach needs tweaking the pool.py module since there is no way that I know of that you can extract the pid from the Pool().

In Pool.py, find the line where the _task_handler_start() method is being called. In the next line, you can assign the process in the pool to each "physical" core using (I put the import os here so that the reader doesn't forget to import it):

import os
for worker in range(len(self._pool)):
    p = self._pool[worker]
    os.system("taskset -p -c %d %d" % (worker % cpu_count(), p.pid))

and you're done. Test:

import multiprocessing

def foo(i):
    return

if __name__ == "__main__" :
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(foo,'iterable here')

result:

pid 524730's current affinity list: 0-31
pid 524730's new affinity list: 0
pid 524731's current affinity list: 0-31
pid 524731's new affinity list: 1
pid 524732's current affinity list: 0-31
pid 524732's new affinity list: 2
pid 524733's current affinity list: 0-31
pid 524733's new affinity list: 3
pid 524734's current affinity list: 0-31
pid 524734's new affinity list: 4
pid 524735's current affinity list: 0-31
pid 524735's new affinity list: 5
...

Note that this modification to pool.py assign the jobs to the cores round-robinly. So if you assign more jobs than the cpu-cores, you will end up having multiple of them on the same core.

EDIT:

What OP is looking for is to have a pool() that is capable of staring the pool on specific cores. For this more tweaks on multiprocessing are needed (undo the above-mentioned changes first):

multiprocessing's pool.py:

add an argument to __init__() by changing

def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None)

to:

def __init__(self, processes=None, initializer=None, initargs=(),
             maxtasksperchild=None,cores_idx=None)

also you should add the following code after self._worker_handler.start():

if not cores_idx is None:
    import os
    for worker in range(len(self._pool)):
        p = self._pool[worker]
        os.system("taskset -p -c %d %d" % (cores_idx[worker % (len(cores_idx))], p.pid))

multiprocessing's __init__.py:

Change the definition of the Pool() from:

def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
    '''
    Returns a process pool object
    '''
    from multiprocessing.pool import Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)

to :

def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None,cores_idx=None):
    '''
    Returns a process pool object
    '''
    from multiprocessing.pool import Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,cores_idx)

And you're done. The following example runs a pool of 5 workers on cores 0 and 2 only:

import multiprocessing


def foo(i):
    return

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=5,cores_idx=[0,2])
    pool.map(foo,'iterable here')

result:

pid 705235's current affinity list: 0-31
pid 705235's new affinity list: 0
pid 705236's current affinity list: 0-31
pid 705236's new affinity list: 2
pid 705237's current affinity list: 0-31
pid 705237's new affinity list: 0
pid 705238's current affinity list: 0-31
pid 705238's new affinity list: 2
pid 705239's current affinity list: 0-31
pid 705239's new affinity list: 0

Of course you can still have the usual functionality of the multiprocessing.Poll() as well by removing the cores_idx argument.

Comments