silgon silgon - 6 months ago 55
Python Question

ZeroMQ Pub/Sub action last element in queue an other elements

I started using

zeromq
with python with the Publisher/Subscriber reference. However, I don't find any documentation about how to treat messages in the queue. I want to treat the last received message different as the rest of the elements of the queue.

Example



publisher.py



import zmq
import random
import time

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
messagedata = random.randrange(1,215)
print "%s %d" % (topic, messagedata)
socket.send("%s %d" % (topic, messagedata))
time.sleep(.2)


subscriber.py



import zmq

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)

while True:
if isLastMessage(): # probably based on socket.recv()
analysis_function() # time consuming function
else:
simple_function() # something simple like print and save in memory


I just want to know how to create the
isLastMessage()
function described in the
subscriber.py
file. If there's something directly in
zeromq
or a workaround.

Answer Source

Sorry, I will keep the question for reference. I just found the answer, in the documentation there is a NOBLOCK flag that you can add to the receiver. With this the recv command doesn't block. A simple workaround, extracted from a part of an answer, is the following:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

As for the real implementation, one is not sure that it is the last call you use the flag NOBLOCK and once you have entered the exception block. Wich translates to something like the following:

msg = subscribe(in_socket)
is_last = False
while True:
    if is_last:
        msg = subscribe(in_socket)
        is_last = False
    else:
        try:
            old_msg = msg
            msg = subscribe(in_socket,flags=zmq.NOBLOCK)
            # if new message was received, then process the old message
            process_not_last(old_msg)
        except zmq.Again as e:
            process_last(msg)
            is_last = True  # it is probably the last message
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download