Kartik Kartik - 2 months ago 18
Python Question

Missing lines when writing file with multiprocessing Lock Python

This is my code:

from multiprocessing import Pool, Lock
from datetime import datetime as dt

console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()

def writer(message):
with open(console_out, 'a') as out:

def conf_wrapper(state):
import ProcessingModule as procs
import sqlalchemy as sal

stcd, nrows = state
engine = sal.create_engine('postgresql://foo:bar@localhost:5432/schema')

writer("State {s} started at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))

with engine.connect() as conn, conn.begin():
procs.processor(conn, stcd, nrows, chunksize)

writer("\tState {s} finished at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))

def main():
nprocesses = 12
maxproc = 1
state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]

with open(console_out, 'w') as out:
out.write("Starting at {n}\n".format(n=dt.now()))
out.write("Using {p} processes..."

with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)

with open(console_out, 'a') as out:
out.write("\nAll done at {n}".format(n=dt.now()))

The file
never has all 7 states in it. It always misses one or more state. Here is the output from the latest run:

Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started at: 2016-07-27 21:47:01.482322
State 02 started at: 2016-07-27 21:47:01.497947
State 11 started at: 2016-07-27 21:47:01.529198
State 10 started at: 2016-07-27 21:47:01.497947
State 11 finished at: 2016-07-27 21:47:15.701207
State 15 finished at: 2016-07-27 21:47:24.123164
State 44 finished at: 2016-07-27 21:47:32.029489
State 50 finished at: 2016-07-27 21:47:51.203107
State 10 finished at: 2016-07-27 21:47:53.046876
State 33 finished at: 2016-07-27 21:47:58.156301
State 02 finished at: 2016-07-27 21:48:18.856979

All done at 2016-07-27 21:48:18.992277


Note, OS is Windows Server 2012 R2.


Since you're running on Windows, nothing is inherited by worker processes. Each process runs the entire main program "from scratch".

In particular, with the code as written every process has its own instance of lock, and these instances have nothing to do with each other. In short, lock isn't supplying any inter-process mutual exclusion at all.

To fix this, the Pool constructor can be changed to call a once-per-process initialization function, to which you pass an instance of Lock(). For example, like so:

def init(L):
    global lock
    lock = L

and then add these arguments to the Pool() constructor:

initializer=init, initargs=(Lock(),),

And you no longer need the:

lock = Lock()


Then the inter-process mutual exclusion will work as intended.


If you'd like to delegate all output to a writer process, you could skip the lock and use a queue instead to feed that process [and see later for different version].

def writer_process(q):
    with open(console_out, 'w') as out:
        while True:
            message = q.get()
            if message is None:
            out.flush() # can't guess whether you really want this

and change writer() to just:

def writer(message):

You would again need to use initializer= and initargs= in the Pool constructor so that all processes use the same queue.

Only one process should run writer_process(), and that can be started on its own as an instance of multiprocessing.Process.

Finally, to let writer_process() know it's time to quit, when it is time for it to drain the queue and return just run


in the main process.


The OP settled on this version instead, because they needed to open the output file in other code simultaneously:

def writer_process(q):
    while True:
        message = q.get()
        if message == 'done':
        elif message is not None:
            with open(console_out, 'a') as out:

I don't know why the terminating sentinel was changed to "done". Any unique value works for this; None is traditional.