hazrmard hazrmard - 3 months ago 10
Python Question

Why am I unable to join this thread in python?

I am writing a multithreading class. The class has a

parallel_process()
function that is overridden with the parallel task. The data to be processed is put in the
queue
. The
worker()
function in each thread keeps calling
parallel_process()
until the
queue
is empty. Results are put in the
results
Queue object. The class definition is:

import threading
try:
from Queue import Queue
except ImportError:
from queue import Queue


class Parallel:

def __init__(self, pkgs, common=None, nthreads=1):
self.nthreads = nthreads
self.threads = []
self.queue = Queue()
self.results = Queue()
self.common = common
for pkg in pkgs:
self.queue.put(pkg)

def parallel_process(self, pkg, common):
pass

def worker(self):
while not self.queue.empty():
pkg = self.queue.get()
self.results.put(self.parallel_process(pkg, self.common))
self.queue.task_done()
return

def start(self):
for i in range(self.nthreads):
t = threading.Thread(target=self.worker)
t.daemon = False
t.start()
self.threads.append(t)

def wait_for_threads(self):
print('Waiting on queue to empty...')
self.queue.join()
print('Queue processed. Joining threads...')
for t in self.threads:
t.join()
print('...Thread joined.')

def get_results(self):
results = []
print('Obtaining results...')
while not self.results.empty():
results.append(self.results.get())
return results


I use it to create a parallel task:

class myParallel(Parallel): # return square of numbers in a list
def parallel_process(self, pkg, common):
return pkg**2

p = myParallel(range(50),nthreads=4)
p.start()
p.wait_for_threads()
r = p.get_results()
print('FINISHED')


However all threads do not join every time the code is run. Sometimes only 2 join, sometimes no thread joins. I do not think I am blocking the threads from finishing. What reason could there be for
join()
to not work here?

Answer

This statement may lead to errors:

while not self.queue.empty():
    pkg = self.queue.get()

With multiple threads pulling items from the queue, there's no guarantee that self.queue.get() will return a valid item, even if you check if the queue is empty beforehand. Here is a possible scenario

  1. Thread 1 checks the queue and the queue is not empty, control proceeds into the while loop.
  2. Control passes to Thread 2, which also checks the queue, finds it is not empty and enters the while loop. Thread 2 gets an item from the loop. The queue is now empty.
  3. Control passes back to Thread 1, it gets an item from the queue, but the queue is now empty, an Empty Exception should be raised.

You should just use a try/except to get an item from the queue

try:
    pkg = self.queue.get_nowait()
except Empty:
    pass
Comments