Chobeat Chobeat - 12 days ago 7
Scala Question

How to write to Kafka from Spark Streaming

I'm using Spark Streaming to process data between two Kafka queues but I can't seem to find a good way to write on Kafka from Spark. I've tried this:

input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>

partition.foreach{
case x:String=>{

val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}


)
)


and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.

KafkaProducer is not serializable, obviously.

I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I do that?

Answer

My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).

Another option is to use an object pool as illustrated in this example:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

I however found it hard to implement when using checkpointing.

Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):

http://allegro.tech/2015/08/spark-kafka-integration.html