André Fratelli André Fratelli - 25 days ago 7
Python Question

Faking an insertion in Python Queue

I'm using a Queue in python, and it works something like a channel. That is, whenever you make an insertion, some other thread is waiting and fetches the inserted value. That value is then yield.

@classsynchronized('mutex')
def send(self, message):

# This might raise Full
if not self._is_closed:
self._queue.put(message)
return True

return False

@classsynchronized('mutex')
def close(self):

# Don't do what we don't need to
if self._is_closed:
return

# Make the _queue.get() request fail with an Empty exception
# This will cause the channel to stop listenning to messages
# First aquire the write lock, then notify the read lock and
# finally release the write lock. This is equivalent to an
# empty write, which will cause the Empty exception
print("ACQUIRING not_full")
self._queue.not_full.acquire()

# Close first. If the queue is empty it will raise Empty as fast as
# possible, instead of waiting for the timeout
self._is_closed = True

try:
print("NOTIFYING not_empty")
self._queue.not_empty.notify()
print("NOTIFIED not_empty")
finally:
self._queue.not_full.release()
print("RELEASED not_full")

def _yield_response(self):
try:
while True:

# Fetch from the queue, wait until available, or a timeout
timeout = self.get_timeout()
print("[WAITING]")
message = self._queue.get(True, timeout)
print("[DONE WAITING] " + message)
self._queue.task_done()

# Don't yield messages on closed queues, flush instead
# This prevents writting to a closed stream, but it's
# up to the user to close the queue before closing the stream
if not self._is_closed:
yield message

# The queue is closed, ignore all remaining messages
# Allow subclasses to change the way ignored messages are handled
else:
self.handle_ignored(message)

# This exception will be thrown when the channel is closed or
# when it times out. Close the channel, in case a timeout caused
# an exception
except Empty:
pass

# Make sure the channel is closed, we can get here by timeout
self.close()

# Finally, empty the queue ignoring all remaining messages
try:
while True:
message = self._queue.get_nowait()
self.handle_ignored(message)

except Empty:
pass


I only included the relevant methods, but notice this is a class. The thing is, this does not behave as I expected. The queue does get closed, all the prints show in the console, but the thread waiting for messages does not get notifyied. Instead, it always exits with a timeout.

All @classsynchronized('mutex') annotations synchronize the methods with the same identifier ('mutex') class-wise, that is, every method in a class with that annotation with the same ID is synchronized with each other.

The reason I acquire the not_full lock before closing is to prevent inserting in a closed channel. Only then do I notify the not_empty lock.

Any idea why this doesn't work? Any other suggestions?

Thanks in advance.

Edit:

I made a few changes to the prints. I create the channel and immediately send a message. Then I send an HTTP request for deleting it. This is the output:

[WAITING]
[DONE WAITING] message
[WAITING]
ACQUIRING not_full
NOTIFYING not_empty
NOTIFIED not_empty
RELEASE not_full


So:


  1. The first message gets processed and successfully dispatched (I get it in the client, so...)

  2. Then the queue is waiting. It should be waiting on the not_empty lock, right?

  3. The I issue a DELETE request for the channel. It acquires the not_full lock (to prevent writes), and notifies the not_empty lock.



I really don't get it... If the thread gets notified why does it not unblock??

Answer

It seems like a bad idea to tamper with Queue's internal locks. How about formulating the problem in terms of only the official interface of Queue?

To emulate closing a queue, for example, you do self._queue.put(None) or some other special value. The waiting thread getting this special value know that the queue has been closed. The problem is that the special value is then no longer in the queue for potentially more threads to see; but this is easily fixed: when a thread gets the special value, it puts it again into the queue immediately.