I am using the Example consumer posted here:
The reason I was using ExampleConsumer was my connection to rabbitmq was failing when the work tasks started taking longer, where longer is greater than 10 minutes. The connection was saying closed after the long running task completed and the process was failing. It previously went through 1000 messages that took a minute or so fine.
ExampleConsumer seems to reconnect fine, however, in the acknowledge message the message isn't actually acknowledged because the connection is dead. It seems to return normally from the below acknowledge message method. It then attempts reconnection after which the message that was just finished gets redelivered.
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
LOGGER.info('Acknowledging message %s', delivery_tag)
RabbitMQ broker implements a default heartbeat timeout that, depending on the RabbitMQ version, is either ~ 10 minutes or 1 minute; the shorter default is in the more recent versions, beginning with RabbitMQ v3.5.5. The application can pass an explicit longer heartbeat timeout preference via connection parameters. Pika's SelectConnection doesn't have a background thread, so when the work task takes longer than the heartbeat timeout, SelectConnection is unable to service the heartbeats within the time limits expected by the broker, and the broker drops the connection. There are different ways you can try to get around this issue: