MKennedy MKennedy - 4 months ago 14
Python Question

Python not freeing RAM while processing large file in parallel

I have a 25Gb plaintext file with ~10 million lines, several hundred words per line. Each line needs to be individually processed and I'm trying to split off chunks to a dozen workers to be processed in parallel. Currently loading in a million lines at a time (this for some reason takes up ~10Gb in RAM even though it's only ~3Gb uncompressed on disk,) splitting it evenly 12 ways, and then mapping it to 12 workers using multiprocessing.Pool.

Problem is when each of my 12 workers finish processing their allocated data, their RAM is not being freed and only increases another ~10Gb on the next million line iteration.

I've tried "del"'ing the previous data, resetting the previous data to an empty allocation, creating iterable variable names with eval(), gc.collect() after deletion, and entirely separating the IO to its own function, all with no luck and the exact same issue. Running debug shows that the python interpreter is only recognizing the expected data and data from previous iteration is not accessible, so why isn't the RAM actually being freed?

The code below is my latest iteration of trying to separate all the environments, not the most efficient but "BigFileOnDisk" is on an SSD so re-reading through the file each time is negligible compared to actually processing the data. Previously had the "read" functionality within the allocation function, deleting all data after workers finished, with same results.

def allocation():
fileCompleted = False
currentLine = 0
while not fileCompleted:
lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
list_of_values(function_object=worker, inputs=lineData, workers=12)


def read(numLines, startLine=0):
currentLine = 0
lines = []
with open(BigFileOnDisk, 'r') as fid:
for line in fid:
if currentLine >= startLine:
lines.append(line)
if currentLine - startLine >= numLines:
return lines, counter, False
currentLine += 1
# or if we've hit the end of the file
return lines, counter, True


def worker(lines):
outputPath = *root* + str(datetime.datetime.now().time())
processedData = {}

for line in lines:
# process data

del lines
with open(outputPath, 'a') as fid:
for item in processedData:
fid.write(str(item) + ', ' + str(processedData[item]) + '\n')


def list_of_values(function_object, inputs, workers = 10):
inputs_split = []
subsection_start = 0
for n in range(workers):
start = int(subsection_start)
end = int(subsection_start + len(inputs) / workers)
subsection_start = end

inputs_split.append( inputs[start:end] )

p = Pool(workers)
p.map(function_object, inputs_split)

Answer

You are not joining sub processes. After list_of_values done processes created by Pool still alive (kinda, more like zombie, but with alive parent process). They still hold all their values. You can't see their data in main because it in another processes (for same reason gc.collect not working).

To free memory allocated by workers you need to manually join Pool or use with.

def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    with Pool(workers) as p:
        p.map(function_object, inputs_split)
Comments