Faisal Ahmed Siddiqui Faisal Ahmed Siddiqui - 1 month ago 14
Java Question

Kafka topic partition and Spark executor mapping

I am using spark streaming with kafka topic. topic is created with 5 partitions. My all messages are published to the kafka topic using tablename as key.
Given this i assume all messages for that table should goto the same partition.
But i notice in the spark log messages for same table sometimes goes to executor's node-1 and sometime goes to executor's node-2.

I am running code in yarn-cluster mode using following command:

spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar


and this submission creates 1 driver lets say on node-1 and 2 executors on node-1 and node-2.

I don't want node-1 and node-2 executors to read the same partition. but this is happening

Also tried following configuration to specify consumer group but no difference.

kafkaParams.put("group.id", "app1");


This is how we are creating the stream using createDirectStream method
*Not through zookeeper.

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);


Complete Code:

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class DataProcessor2 implements Serializable {
private static final long serialVersionUID = 3071125481526170241L;

private static Logger log = LoggerFactory.getLogger("DataProcessor");

public static void main(String[] args) {
final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
DataProcessorContextFactory3 factory = new DataProcessorContextFactory3();
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory);

// Start the process
jssc.start();
jssc.awaitTermination();
}

}

class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable {
private static final long serialVersionUID = 6070911284191531450L;

private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class);

DataProcessorContextFactory3() {
}

@Override
public JavaStreamingContext create() {
logger.debug("creating new context..!");

final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME);
final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME);
final String app = "app1";
final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest");

logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app,
offset);
if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) {
System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME
+ " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME
+ " is a kafka topic to consume from \n\n\n");
System.exit(1);
}
final String majorVersion = "1.0";
final String minorVersion = "3";
final String version = majorVersion + "." + minorVersion;
final String applicationName = "DataProcessor-" + topic + "-" + version;
// for dev environment
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName);
// for cluster environment
//SparkConf sparkConf = new SparkConf().setAppName(applicationName);
final long sparkBatchDuration = Long
.valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10"));

final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration));
logger.debug("setting checkpoint directory={}", sparkCheckPointDir);
jssc.checkpoint(sparkCheckPointDir);

HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", offset);
kafkaParams.put("group.id", "app1");

// @formatter:off
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// @formatter:on
processRDD(messages, app);
return jssc;
}

private void processRDD(JavaPairInputDStream<String, String> messages, final String app) {
JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction());

rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() {

private static final long serialVersionUID = 250647626267731218L;

@Override
public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception {
if (!currentRdd.isEmpty()) {
logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName());
currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() {

@Override
public void call(Iterator<MsgStruct> arg0) throws Exception {
while(arg0.hasNext()){
System.out.println(arg0.next().toString());
}
}
});
} else {
logger.debug("Current RDD is empty.");
}
return null;
}
});
}
public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> {
@Override
public MsgStruct call(Tuple2<String, String> data) throws Exception {
String message = data._2();
System.out.println("message:"+message);
return MsgStruct.parse(message);
}

}
public static class MsgStruct implements Serializable{
private String message;
public static MsgStruct parse(String msg){
MsgStruct m = new MsgStruct();
m.message = msg;
return m;
}
public String toString(){
return "content inside="+message;
}
}

}

Answer

Using the DirectStream approach it's a correct assumption that messages sent to a Kafka partition will land in the same Spark partition.

What we cannot assume is that each Spark partition will be processed by the same Spark worker each time. On each batch interval, Spark task are created for each OffsetRange for each partition and sent to the cluster for processing, landing on some available worker.

What you are looking for partition locality. The only partition locality that the direct kafka consumer supports is the kafka host containing the offset range being processed in the case that you Spark and Kafka deployements are colocated; but that's a deployment topology that I don't see very often.

In case that your requirements dictate the need to have host locality, you should look into Apache Samza or Kafka Streams.

Comments