Chobeat Chobeat - 26 days ago 7x
Scala Question

Counting latest state of stateful entities in streaming with Flink

in the last few days I tried to create my first real-time analytics job in Flink. The approach is kappa-architecture-like, so I have my raw data on Kafka where we receive a message for every change of state of any entity.

So the messages are of the form

(id,newStatus, timestamp)

We want to compute, for every time window, the count of items in a given status. So the output should be of the form

(outputTimestamp, state1:count1,state2:count2 ...)

or equivalent. These rows should contain, at any given time, the count of the items in a given status, where the status associated to an Id is the most recent message observed for that id. The status for an id should be counted in any case, even if the event is way older than those getting processed. So the sum of all the counts should be equal to the number of different IDs observed in the system. The following step could be forgetting about the items in a final item after a while, but this is not a strict requirement right now.

This will be written on elasticsearch and then queried.

I tried many different paths and none of them completely satisfied the requirement. Using a sliding window I could easily achieve the expected behaviour, except that when the beginning of the sliding window surpassed the timestamp of an event, it was lost for the count, as you may expect. Others approaches failed to be consistent when working with a backlog because I did some tricks with keys and timestamps that failed when the data was processed all at once.

So I would like to know, even at an high level, how should I approach this problem. It looks like a relatively common use-case but the fact that the relevant information for a given ID must be retained indefinitely to count the entities correctly creates a lot of problems.

Thank you in advance,



I think I have a solution for your problem:

Given a DataStream of (id, state, time) as:

val stateUpdates: DataStream[(Long, Int, ts)] = ???

You derive the actual state changes as follows:

val stateCntUpdates: DataStream[(Int, Int)] = s // (state, cntUpdate)
  .keyBy(_._1) // key by id
  .flatMap(new StateUpdater) 

StateUpdater is a stateful FlatMapFunction. It has a keyed state that stores the last state of each id. For each input record it returns two state count update records: (oldState, -1), (newState, +1). The (oldState, -1) record ensures that counts of previous states are reduced.

Next you aggregate the state count changes per state and window:

val cntUpdatesPerWindow: DataStream[(Int, Int, Long)] = stateCntUpdates // (state, cntUpdate, time)
  .keyBy(_._1) // key by state
  .timeWindow(Time.minutes(10)) // window should be non-overlapping, e.g. Tumbling
  .apply(new SumReducer(), new YourWindowFunction()) 

SumReducer sums the cntUpdates and YourWindowFunction assigns the timestamp of your window. This step aggregates all state changes for each state in a window.

Finally, we adjust the current count with the count updates.

val stateCnts: DataStream[(Int, Int, Long)] = cntUpdatesPerWindow // (state, count, time)
  .keyBy(_._1) // key by state again
  .map(new CountUpdater)

CountUpdater is a stateful MapFunction. It has a keyed state that stores the current count for each state. For each incoming record, the count is adjusted and a record (state, newCount, time) is emitted.

Now you have a stream with new counts for each state (one record for each state). If possible, you can update your Elasticsearch index using these records. If you need to collect all state counts for a given time you can do that using a window.

Please note: The state size of this program depends on the number of unique ids. That might cause problems if the id space grows very fast.