VladoDemcak VladoDemcak - 1 year ago 55
Scala Question

Build stateful chain for different events and assign globalID in spark

We are working with

spark 1.6
and we are trying to keep global identity for similar events. There can be few "groups"of events with identical ID (in the example as number. letters are added just for uniqueness). And we know that some of these events are similar so we are able to connect them. We want to keep something like:

Z -> 1, 2, 3
X -> 4


so in a future if some events with id 4 will come we can assign
X
as a global identity.

Please check example for better illustration:

Let's say we have some streaming data coming into spark job.

1a
1b
2c
2d
2e
3f
3g
3h
4i


Since event 1 is our first appearance we want to assign
1 to Z
.
Next what we know is that 1b and 2c are similar. so we want to keep somewhere
2->1
mapping. Same thing is for 2e and 3f so we need mapping
3-2
. So for now we have 3 pairs
1->Z
,
2->1
,
3->2
.

And we want to create "historical" path:
Z <- 1 <- 2 <- 3

At the end we will have all events with
ID = Z
.

1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> X


We tried to use
mapwithstate
but only thing we were able to do was that
2->1
and
3->2
. With
mapwithstate
we were not able to get state for "parent" in state for current event - eg. current event 3 with parent 2 and not able to get
2 -> 1
and neither
1 -> Z
.

Is it possible to have some global mapping for this? We already tried accumulators and broadcast but looks like not very suitable. And we were not able to replace events 1 for first mapping and events 2 for second mapping with
Z
.

If new event
5
will come and it is similar with 3h for example we need to assign mapping
5->Z
again.

Answer Source

What follows is a solution for the given problem, using a mutable reference to a 'state' RDD that we update with new results each time.

We use transform to tag the incoming event stream with the unique global id by doing a similarity join. This is a join "by hand" where we use a product of the two datasets and compare each entry pair-wise.

Note that this is an expensive process. There are many parts that could be changed, depending on specific characteristics of the expected stream. For example, we could replace the global state RDD by a local map and apply map-side joins for a faster similarity join, but that very much depends on the expected cardinality of the set of unique ids.

This was trickier than I originally expected. Take it only as a starting point towards a more robust solution. For example, the union operation on the state RDD needs regular checkpointing to avoid the DAG to grow beyond control. (There's a lot of room for improvement - but that's beyond a reasonable effort to provide an answer.)

Here I sketch the core of the solution, for the complete test notebook see UniqueGlobalStateChains.snb

// this mutable reference points to the `states` that we keep across interations    
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD

// we assume an incoming Event stream. Here we prepare it for the global id-process 
@transient val eventsById = eventStream.map(event => (event.id, event))
@transient val groupedEvents = eventsById.groupByKey()

// this is the core of the solution. 
// We transform the incoming events into tagged events. 
// As a by-product, the mutable `states` reference will get updated with the latest state mapping. 
// the "chain" of events can be reconstructed ordering the states by timestamp

@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => 
    val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}                        
    val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}

    val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events
    val similarityJoinMap = newEventIds.cartesian(currentMappings)
        .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}
        .collectAsMap
    //val similarityBC = sparkContext.broadcast(similarityJoinMap)                   
    val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))
    newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids

    val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => 
                                      events.map(event => (event.id,event.payload, globalKey))
                                     }
    val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}
    currentState = newStates              
    states.unpersist(false)                              
    states = newStates.union(states)
    states.cache()                              
    newTaggedEvents
    }

Given this input sequence:

"1|a,1|b,3|c",  "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h"

We get:

Tagged Events with a global id:

---
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819
---
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180
---
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819
---
5|k: gen-5819
---
1|g: gen-2635,4|f: gen-4180
---
6|h: gen-5819

And we can reconstruct the chain of events that are derived from a global id:

gen-4180: 1<-2<-3<-4
gen-2635: 1
gen-5819: 3<-4<-5<-6

-o-

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download