khateeb khateeb - 1 month ago 18
Java Question

How do I make Spark consume from the previous offset id on restarting?

I am using Spark to consume data from Kafka. On restarting Spark after consuming some data, how do I ensure that Spark will start consuming from the offset that it had left off?

For example, if one on the first run, Spark had consumed till offset id

x
. How do I ensure that on the next run it will start with offset id
x+1
?

SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());

Answer

In the receiver based approach (which you're using via KafkaUtils.createStream), the offsets are saved and handled by the WAL (Write Ahead Log), which is responsible for resuming your application from the proper offsets. If you want to be able to control exactly where you resume your application from, look into KafkaUtils.createDStream and generally the Direct (receiverless) API streaming approach with the overload taking fromOffsets: Map[TopicAndPartition, Long].