Patrick Patrick - 8 months ago 394
Java Question

Spark streaming mapWithState timeout delayed?

I expected the new mapWithState API for Spark 1.6+ to near-immediately remove objects that are timed-out, but there is a delay.

I'm testing the API with the adapted version of the JavaStatefulNetworkWordCount below:

SparkConf sparkConf = new SparkConf()

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =
StateSpec.function((word, one, state) -> {
if (state.isTimingOut())
System.out.println("Timing out the word: " + word);
return new Tuple2<String,Integer>(word, state.get());
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
return output;

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
.flatMap(x -> Arrays.asList(SPACE.split(x)))
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))


Together with nc (
nc -l -p <port>

When I type a word into the nc window I see the tuple being printed in the console every second. But it doesn't seem like the timing out message gets printed out 5s later, as expected based on the timeout set. The time it takes for the tuple to expire seems to vary between 5 & 20s.

Am I missing some configuration option, or is the timeout perhaps only performed at the same time as snapshots?


Am I missing some configuration option, or is the timeout perhaps only performed at the same time as snapshots?

Every time a mapWithState is invoked (with your configuration, around every 1 second), the MapWithStateRDD will internally check for expired records and time them out. You can see it in the code:

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
  newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
    val returned = mappingFunction(batchTime, key, None, wrappedState)
    mappedData ++= returned

You have to take into account the time it takes for each stage to be scheduled, and the amount of time it takes for each execution of such a stage to actually take it's turn and run. It isn't accurate because this runs as a distributed systems where other factors can come into play, making your timeout more/less accurate than you expect it to be.


As @etov rightly points out, newStateMap.remove(key) doesn't actually remove the element from the OpenHashMapBasedStateMap[K, S], but simply mark it for deletion. This is also a reason why you're seeing the expiration time adding up.