user1521672 user1521672 - 2 months ago 42
Scala Question

KafkaUtils API | offset management | Spark Streaming

I am trying to manage kafka offsets for exactly once semantics.

Facing problem while creating a direct stream using offset map as follows :

val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)


here,

val messageHandler =
(mmd: MessageAndMetadata[String, String]) => mmd.message.length


And

metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )


I guess I am doing something wrong with the declaration style...if you could help.
The compilation error says "too many type arguments for createDirectStream"

Answer

A couple of things I see that you're doing wrong.

You need to pass a Map[TopicAndPartition, Long], while currently you have a Tuple2[TopicAndPartition, Long]. So you need:

val fromOffsets: Map[TopicAndPartition, Long] = 
    Map(TopicAndPartition(metrics_rs.getString(1), 
                          metrics_rs.getInt(2)) -> metrics_rs.getLong(3))

You say your return type from createDirectStream is a tuple of type (String, String), yet your messageHandler value is an Int. If you want to return a tuple with key value pairs, you need:

val messageHandler: MessageAndMetadata[String, String] => (String, String) =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())

After fixing that, this should compile:

val stream = KafkaUtils
              .createDirectStream[String, String,
                      StringDecoder, StringDecoder,
                      (String, String)] (ssc, 
                                         kafkaParams, 
                                         fromOffsets, 
                                         messageHandler)