mvg mvg - 3 years ago 272
Python Question

Integrating Apache Kafka with Apache Spark Streaming using Python

I am trying to integrate Apache Kafka with Apache spark streaming using Python (I am new to all these).

For this I have done the following steps

  1. Started Zookeeper

  2. Started Apache Kafka

  3. Added topic in Apache Kafka

  4. Managed to list available topics using this command

bin/ --list --zookeeper localhost:2181

  1. I have taken the Kafka word count code from here

and the code is

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: <zk> <topic>", file=sys.stderr)

sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)


  1. I executed the code using the command

./spark-submit /root/girish/python/ localhost:2181

and I got this error

Traceback (most recent call last):
File "/root/girish/python/", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-", line 72, in createStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o23.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at Method)
at java.lang.ClassLoader.loadClass(
at java.lang.ClassLoader.loadClass(
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at py4j.reflection.MethodInvoker.invoke(
at py4j.reflection.ReflectionEngine.invoke(
at py4j.Gateway.invoke(
at py4j.commands.AbstractCommand.invokeMethod(
at py4j.commands.CallCommand.execute(

  1. I have updated the execution code using the answer from this question

spark submit failed with spark streaming workdcount python code


./spark-submit --jars /root/spark-,/usr/hdp/,/usr/hdp/,/usr/hdp/ /root/girish/python/ localhost:2181 <topic name>

Now I am getting this error

File "/root/girish/python/", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-", line 67, in createStream
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
File "/root/spark-", line 529, in __call__
File "/root/spark-", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'

Please help to solve this issue.

Thanks in advance

PS: I am using Apache Spark 1.2

mvg mvg
Answer Source

Problem solved by using Apache Spark 1.3 which has better support for Python than version 1.2

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download