Hasan Can Saral Hasan Can Saral - 1 year ago 171
Scala Question

How to remove duplicates (more like filter based on multiple properties) with Spark RDD in Scala?

As a policy, we do not update our documents, but we recreate with updated values. When I will process the events, I would like to keep only the updated ones, so I would like to filter items out of my RDD based on multiple values. For instance, say an item would be:

"name": "Sample",
"someId": "123",
"createdAt": "2016-09-21T02:16:32+00:00"

and when it is updated:

"name": "Sample-Updated",
"someId": "123", # This remains the same
"createdAt": "2016-09-21T03:16:32+00:00" # This is greater than the one of above, since the update operation is done after the document is generated

What I have been doing is:

items = items.toList.

items = items

but this obviously converts RDD into a list; end of Spark. How do I achieve this?


So far, I have achieved this by looking into comments, but no luck when adding into set:

// Is this correct? (1)
val initialSet = sc.parallelize(List[(String, Event)]())

val addToSet = (eventSet: RDD[(String, Event)],
event: Event) => {
// What to do here? (2)

// Is this correct? (3)
val mergeSets = (p1: RDD[(String, Event)],
p2: RDD[(String, Event)]) => p1.union(p2)

// resultSet is of type RDD[(String, RDD[(String, Event)])]. How to get it as RDD[(String, Event)]? (4)
val resultSet = initialSet.aggregateByKey(initialSet)(addToSet, mergeSets)

Answer Source

You should be able to use reduceByKey here:

  .reduceByKey((x, y) => if (x.createdAt > y.createdAt) x else y)

where initial keyBy creates (id, object), reduceByKey selects the most recent object, and values drops keys.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download