Rumpli Rumpli -5 years ago 230
Python Question

Pika: Consume the next message even the last message was not acknowledged

For server automation, we're trying to develop a tool, which can handle and execute a lot of tasks on different servers. We send the task and the server hostname into a queue. The queue is then consumed from a requester, which give the information to the ansible api. To achieve that we can execute more then one task at once, we're using threading.

Now we're stuck with the acknowledge of the message...

What we have done so far:

consumes the queue and starts then a thread, in which the ansible task is running. The result is then sended into another queue. So each new messages creates a new thread. Is the task finished, the thread dies.

But now comes difficult part. We have to made the messages persistent, in case our server dies. So each message should be acknowledged after the result from ansible was sended back.

Our problem is now, when we try to acknowledged the message in the thread itselfs, there is no more "simultaneously" work done, because the
of pika waits for the acknowledge. So how we can achieve, that the
consumes messages and dont wait for the acknowledge? Or how we can work around or improve our little programm?


from worker import *
import ansible.inventory
import ansible.runner
import threading

class Requester(Worker):
def __init__(self):
self.connection(self.selfhost, self.from_db)

def send(self, result, ch, method):'',

print "[x] Sent \n" + result
ch.basic_ack(delivery_tag = method.delivery_tag)

def callAnsible(self, cmd, ch, method):
#call ansible api pre 2.0

result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
self.send(result, ch, method)

def callback(self, ch, method, properties, body):
print(" [x] Received by requester %r" % body)
t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))

import pika
import ConfigParser
import json
import os

class Worker(object):
def __init__(self):
#read some config files

def callback(self, ch, method, properties, body):
raise Exception("Call method in subclass")

def receive(self, queue):,queue=queue)

def connection(self,server,queue):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
credentials=self.credentials)) =, durable=True)

We're working with Python 2.7 and pika 0.10.0.

And yes, we noticed in the pika FAQ:

that pika is not thread safe.

Answer Source

Disable auto-acknowledge and set the prefetch count to something bigger then 1, depending on how many messages would you like your consumer to take.

Here is how to set prefetch channel.basic_qos(prefetch_count=1), found here.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download