jocerfranquiz jocerfranquiz - 2 months ago 43
Python Question

How to implement a RabbitMQ consumer using Pyspark Streaming module?

I have an Apache Spark cluster and a RabbitMQ broker and I want to consume messages and compute some metrics using the

pyspark.streaming
module.

The problem is I only found this package, but is implemented in Java and Scala. Besides that, I didn't find any example or bridge implementation in Python.

I have a consumer implemented using Pika but I don't know how to pass the payload to my
StreamingContext
.

Answer

This solution uses pika asynchronous consumer example and socketTextStream method from Spark Streaming

  1. Download the example and save it as a .py file
  2. Modify the file to use your own RabbitMQ credentials and connection parameters. In my case I had to modify the Consumer class
  3. Under if __name__ == '__main__': we need to open a socket with the HOST and PORT corresponding to your TCP connection to Spark Streaming. We must save the method sendall from socket into a variable pass this to the Consumer class

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
      s.bind((HOST, PORT))
      s.listen(1)
      conn, addr = s.accept()
      dispatcher = conn.sendall #assigning sendall to dispatcher variable
    consumer = Consumer(dispatcher)
    try:
      consumer.run()
    except Exception as e:
      consumer.stop()
      s.close()
    
  4. Modify the __init__ method in Consumer to pass the dispatcher

    def __init__(self,dispatcher):
      self._connection = None
      self._channel = None
      self._closing = False
      self._consumer_tag = None
      self._url = amqp_url
      #new code
      self._dispatcher = dispatcher
    
  5. In the method on_message inside the Consumer we call self._dispatcher to send the body of the AMQP message

    def on_message(self, unused_channel, basic_deliver, properties, body):
      self._channel.basic_ack(basic_deliver.delivery_tag)
      try:
        # we need an '\n' at the each row Spark socketTextStream
        self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
      except Exception as e:
        raise
    
  6. In Spark, put ssc.socketTextStream(HOST, int(PORT)) with HOST and PORT corresponding to our TCP socket. Spark will manage the connection

  7. Run first the consumer and then the Spark application

Final remarks:

  • Try to run your consumer on a different machine rather than your Spark machine
  • Any port over 10000 should be ok. Don't let the kernel to open some random port
  • Platform: Linux Debian 7 and 8, and Ubuntu 14.04 and 16.04
  • Pika version 0.10.0
  • Python version 3.5.2
  • Spark version 1.6.1, 1.6.2, and 2.0.0