Arun A K Arun A K - 6 months ago 19
Java Question

Spark Java Accumulator not incrementing

Just started with baby steps in Spark-Java. Below is a word count program that includes a stop word list that would skip words that are in the list. I have 2 accumulators to count the skipped words and unskipped words.

However, the

Sysout
at the end of program always gives both accumulator values to be 0.

Please point out where I am going wrong.

public static void main(String[] args) throws FileNotFoundException {

SparkConf conf = new SparkConf();
conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> fileRDD = jsc.textFile("hello.txt");
JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() {

public Iterable<String> call(String aLine) throws Exception {
return Arrays.asList(aLine.split(" "));
}
});

String[] stopWordArray = getStopWordArray();

final Accumulator<Integer> skipAccumulator = jsc.accumulator(0);
final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0);

final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray);

JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() {

public Boolean call(String inString) throws Exception {
boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString);
if(!filterCondition){
System.out.println("Filtered a stop word ");
skipAccumulator.add(1);
}else{
unSkipAccumulator.add(1);
}
return filterCondition;

}
});

System.out.println("$$$$$$$$$$$$$$$Filtered Count "+skipAccumulator.value());
System.out.println("$$$$$$$$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value());

/* rest of code - works fine */
jsc.stop();
jsc.close();
}


I am making a runnable jar and submit the job on Hortonworks Sandbox 2.4 using

spark-submit jarname


------------EDIT----------------

REST of the code that goes in the commented portion

JavaPairRDD<String, Integer> wordOccurrence = filteredWords.mapToPair(new PairFunction<String, String, Integer>() {

public Tuple2<String, Integer> call(String inWord) throws Exception {
return new Tuple2<String, Integer>(inWord, 1);
}
});

JavaPairRDD<String, Integer> summed = wordOccurrence.reduceByKey(new Function2<Integer, Integer, Integer>() {

public Integer call(Integer a, Integer b) throws Exception {
return a+b;
}
});

summed.saveAsTextFile("hello-out");

Answer

You missed posting the important part /* rest of code - works fine */. I can just about guarantee that you are calling some action in that rest of code. Which triggers the DAG to execute the code with the accumulator. Try adding a filteredWords.collect before the println and you should see the output. Remember that Spark is lazy on transformations, and only executes on actions.