bublitz bublitz - 1 year ago 147
Python Question

Python: Process file using multiple cores

I am currently trying to read a large file (80 million lines), where I need to make a computationally intensive matrix multiplication for each entry. After calculating this, I want to insert the result into a database. Because of the time intensive manner of this process, I want to split the file onto multiple cores to speed up the process.

After researching I found this promising attempt, which split a file into n parts.

def file_block(fp, number_of_blocks, block):
'''
A generator that splits a file into blocks and iterates
over the lines of one of the blocks.

'''

assert 0 <= block and block < number_of_blocks
assert 0 < number_of_blocks

fp.seek(0,2)
file_size = fp.tell()

ini = file_size * block / number_of_blocks
end = file_size * (1 + block) / number_of_blocks

if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()

while fp.tell() < end:
yield fp.readline()


Iteratively, you can call the function like this:

if __name__ == '__main__':
fp = open(filename)
number_of_chunks = 4
for chunk_number in range(number_of_chunks):
print chunk_number, 100 * '='
for line in file_block(fp, number_of_chunks, chunk_number):
process(line)


While this works, I run into problems, parallelizing this using multiprocessing:

fp = open(filename)
number_of_chunks = 4
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)]

p = Pool(cpu_count() - 1)
p.map(processChunk,li)


With the error being, that generators cannot be pickled.

While I understand this error, it is too expensive to first iterate over the whole file to put all lines into a list.

Moreover, I want to use blocks of lines per core per iteration, because it is more efficient to insert multiple lines into the database at once (instead of 1 by 1 if using the typical map approach)

Thanks for your help.

Answer Source

Instead of creating generators up front and passing them into each thread, leave that to the thread code.

def processChunk(params):
    filename, chunk_number, number_of_chunks = params
    with open(filename, 'r') as fp:
        for line in file_block(fp, number_of_chunks, chunk_number):
            process(line)

li = [(filename, i, number_of_chunks) for i in range(number_of_chunks)]
p.map(processChunk, li)