wishi wishi - 2 months ago 12
Python Question

Threading queue hangs in Python

I am trying to make a parser multi-threaded via a Queue. It seems to work, but my Queue is hanging. I'd appreciate if someone could tell me how to fix this, since I have rarely written multi-threaded code.

This code reads from the Q:

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread

l = []
q = Queue.Queue()

def parse_record():
d = {}
while not q.empty():
rec = q.get()
d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
# ... many ops like this
d['dport'] = rec.dport
l.append(d) # l is global


And this fills the Q:

def parse_records():
ffile = '/tmp/query.rwf'
flows = SilkFile(ffile, READ)
numthreads = 2

# fill queue
for rec in flows:
q.put(rec)
# work on Queue
for i in range(numthreads):
t = Thread(target = parse_record)
t.daemon = True
t.start()

# blocking
q.join()

# never reached
data_df = pandas.DataFrame.from_records(l)
return data_df


I only call
parse_records()
in my main. It never terminates.

Answer

The Queue.empty doc says:

...if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

As a minimum you should use get_nowait or risk data loss. But more importantly, the join will only release when all of the queued items have been marked complete with a Queue.task_done call:

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

As a side note, l.append(d) is not atomic and should be protected with a lock.

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread, Lock

l = []
l_lock = Lock()
q = Queue.Queue()

def parse_record():
    d = {}
    while 1:
        try:
            rec = q.getnowait()
            d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
            # ... many ops like this
            d['dport'] = rec.dport
            with l_lock():
                l.append(d) # l is global
            q.task_done()
        except Queue.Empty:
            return

You could shorten your code considerably by using a thread pool from the standard libs.

from silk import *
import json
import datetime
import pandas
import multiprocessing.pool

def parse_record(rec):
    d = {}
    d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
    # ... many ops like this
    d['dport'] = rec.dport
    return d

def parse_records():
    ffile = '/tmp/query.rwf'
    flows = SilkFile(ffile, READ)
    pool = multiprocessing.pool.Pool(2)
    data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows)
    pool.close()
    return data_df
Comments