LearningSlowly LearningSlowly - 3 months ago 21
Python Question

multiprocessing - calling function with different input files

I have a function which reads in a file, compares a record in that file to a record in another file and depending on a rule, appends a record from the file to one of two lists.

I have an empty list for adding matched results to:

match = []


I have a list
restrictions
that I want to compare records in a series of files with.

I have a function for reading in the file I wish to see if contains any matches. If there is a match, I append the record to the
match
list.

def link_match(file):
links = json.load(file)
for link in links:
found = False
try:
for other_link in other_links:
if link['data'] == other_link['data']:
match.append(link)
found = True
else:
pass
else:
print "not found"


I have numerous files that I wish to compare and I thus wish to use the multiprocessing library.

I create a list of file names to act as function arguments:

list_files=[]
for file in glob.glob("/path/*.json"):
list_files.append(file)


I then use the
map
feature to call the function with the different input files:

if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
pool.map(link_match,list_files)
pool.close()
pool.join()


CPU use goes through the roof and by adding in a print line to the function loop I can see that matches are being found and the function is behaving correctly.

However, the
match
results list remains empty. What am I doing wrong?

Answer

When multiprocessing, each subprocess gets its own copy of any global variables defined in the main script. This means that the link_match() function in each one of them is accessing a different match list.

One workaround is to use a shared list, which in turn requires a SyncManager to synchronize access to the shared resource among the processes (which is created by calling multiprocessing.Manager()). This is then used to create the list to store the results (which I have named matches instead of match) in the code below.

I also had to use functools.partial() to create a single argument callable out of the revised link_match function which now takes two arguments, not one (which is the kind of function pool.map() expects).

from functools import partial
import glob
import multiprocessing

def link_match(matches, file):  # note: added results list argument
    links = json.load(file)
    for link in links:
        found = False
        try:
            for other_link in other_links:
                if link['data'] == other_link['data']:
                    matches.append(link)
                    found = True
                else:
                    pass
        else:
            print "not found"

list_files=[]
for file in glob.glob("*.json"):
    list_files.append(file)

if __name__ == '__main__':
    manager = multiprocessing.Manager()  # create SyncManager
    matches = manager.list()  # create a shared list
    link_matches = partial(link_match, matches)  # create one arg callable to
                                                 # pass pool.map()

    pool = multiprocessing.Pool(processes=6)
    pool.map(link_matches, list_files)  # apply partial to files list
    pool.close()
    pool.join()
    print(matches)