Daniel Zolnai Daniel Zolnai - 3 months ago 31
Scala Question

Spark mapWithState API explanation

I have been using the

mapWithState
API in Spark Streaming, but 2 things are not clear about the
StateSpec.function
:

Let's say my function is:

def trackStateForKey(batchTime: Time,
key: Long,
newValue: Option[JobData],
currentState: State[JobData]): Option[(Long, JobData)]



  1. Why is the new value an
    Option[T]
    type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.

  2. What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using
    state.remove()
    and
    state.update()
    , why would I have to do the same with return values?

    In my current implementation I return
    None
    if I remove the key, and
    Some(newState)
    if I update it, but I'm not sure if that is correct.


Answer

Why is the new value an Option[T] type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.

It is an Option[T] for the reason that if you set a timeout using StateSpec.timeout, e.g:

StateSpec.function(spec _).timeout(Milliseconds(5000))

then the value passed in once the function times out will be None and the isTimingOut method on State[T] will yield true. This makes sense, because a timeout of the state doesn't mean that a new value has arrived for the specified key, and generally safer to use than passing null for T (which wouldn't work for primitives anyway) as you expect the user to safely operate on an Option[T].

You can see that in the Sparks implementation:

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
  newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
    wrappedState.wrapTimingOutState(state)
    val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
    mappedData ++= returned
    newStateMap.remove(key)
  }
}

What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?

The return value is a way to pass intermediate state along the spark graph. For example, assume that I want to update my state but also perform some operation in my pipeline with the intermediate data, e.g:

dStream
  .mapWithState(stateSpec)
  .map(optionIntermediateResult.map(_ * 2))
  .foreachRDD( /* other stuff */)

That return value is exactly what allows me to continue operating on said data. If you don't care for the intermediate result and only want the complete state, then outputting None is perfectly fine.

Edit:

I've written a blog post (following this question) which attempts to give in-depth with the API.

Comments