Justin Thomas Justin Thomas - 8 days ago 7
Python Question

Never ending message loop: Same message redelivered in python rabbitmq consumer

I am using the Example consumer posted here:

http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html

The reason I was using ExampleConsumer was my connection to rabbitmq was failing when the work tasks started taking longer, where longer is greater than 10 minutes. The connection was saying closed after the long running task completed and the process was failing. It previously went through 1000 messages that took a minute or so fine.

ExampleConsumer seems to reconnect fine, however, in the acknowledge message the message isn't actually acknowledged because the connection is dead. It seems to return normally from the below acknowledge message method. It then attempts reconnection after which the message that was just finished gets redelivered.

def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.

:param int delivery_tag: The delivery tag from the Basic.Deliver frame

"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)

Answer

RabbitMQ broker implements a default heartbeat timeout that, depending on the RabbitMQ version, is either ~ 10 minutes or 1 minute; the shorter default is in the more recent versions, beginning with RabbitMQ v3.5.5. The application can pass an explicit longer heartbeat timeout preference via connection parameters. Pika's SelectConnection doesn't have a background thread, so when the work task takes longer than the heartbeat timeout, SelectConnection is unable to service the heartbeats within the time limits expected by the broker, and the broker drops the connection. There are different ways you can try to get around this issue:

  1. Set a longer heartbeat timeout preference via pika.connection.ConnectionParameters (probably the easiest). ConnectionParameters.heartbeat_interval=0 is supposed to disable heartbeats (and heartbeat timeouts) altogether.
  2. Run the connection on a separate thread from the task-processing logic
  3. Switch to one of the cooperative multitasking connection types, such as Tornado or Twisted framework-based adapters in Pika, or gevent-based adapter in Haigha. This change would require the task processing logic to be friendly to cooperative multitasking.
Comments