jocerfranquiz jocerfranquiz - 28 days ago 18
Python Question

How to implement a RabbitMQ consumer using Pyspark Streaming module?

I have an Apache Spark cluster and a RabbitMQ broker and I want to consume messages and compute some metrics using the


The problem is I only found this package, but is implemented in Java and Scala. Besides that, I didn't find any example or bridge implementation in Python.

I have a consumer implemented using Pika but I don't know how to pass the payload to my


This solution uses pika asynchronous consumer example and socketTextStream method from Spark Streaming

  1. Download the example and save it as a .py file
  2. Modify the file to use your own RabbitMQ credentials and connection parameters. In my case I had to modify the Consumer class
  3. Under if __name__ == '__main__': we need to open a socket with the HOST and PORT corresponding to your TCP connection to Spark Streaming. We must save the method sendall from socket into a variable pass this to the Consumer class

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
      s.bind((HOST, PORT))
      conn, addr = s.accept()
      dispatcher = conn.sendall #assigning sendall to dispatcher variable
    consumer = Consumer(dispatcher)
    except Exception as e:
  4. Modify the __init__ method in Consumer to pass the dispatcher

    def __init__(self,dispatcher):
      self._connection = None
      self._channel = None
      self._closing = False
      self._consumer_tag = None
      self._url = amqp_url
      #new code
      self._dispatcher = dispatcher
  5. In the method on_message inside the Consumer we call self._dispatcher to send the body of the AMQP message

    def on_message(self, unused_channel, basic_deliver, properties, body):
        # we need an '\n' at the each row Spark socketTextStream
      except Exception as e:
  6. In Spark, put ssc.socketTextStream(HOST, int(PORT)) with HOST and PORT corresponding to our TCP socket. Spark will manage the connection

  7. Run first the consumer and then the Spark application

Final remarks:

  • Try to run your consumer on a different machine rather than your Spark machine
  • Any port over 10000 should be ok. Don't let the kernel to open some random port
  • Platform: Linux Debian 7 and 8, and Ubuntu 14.04 and 16.04
  • Pika version 0.10.0
  • Python version 3.5.2
  • Spark version 1.6.1, 1.6.2, and 2.0.0