Siyu  Leng Siyu Leng - 1 month ago 18
Scala Question

How to efficiently delete subset in spark RDD

When conducting research, I find it somewhat difficult to delete all the subsets in Spark RDD.

The data structure is

RDD[(key,set)]
. For example, it could be:

RDD[ ("peter",Set(1,2,3)), ("mike",Set(1,3)), ("jack",Set(5)) ]


Since the set of mike
(Set(1,3))
is a subset of peter's
(Set(1,2,3))
, I want to delete "mike", which will end up with

RDD[ ("peter",Set(1,2,3)), ("jack",Set(5)) ]


It is easy to implement in python locally with two "for" loop operation. But when I want to extend to cloud with scala and spark, it is not that easy to find a good solution.

Thanks

Answer

I doubt we can escape to comparing each element to each other (the equivalent of a double loop in a non-distributed algorithm). The subset operation between sets is not reflexive, meaning that we need to compare is "alice" subsetof "bob" and is "bob" subsetof "alice".

To do this using the Spark API, we can resort to multiplying the data with itself using a cartesian product and verifying each entry of the resulting matrix:

val data = Seq(("peter",Set(1,2,3)), ("mike",Set(1,3)), ("anne", Set(7)),("jack",Set(5,4,1)), ("lizza", Set(5,1)), ("bart", Set(5,4)), ("maggie", Set(5)))
// expected result from this dataset = peter, olga,  anne, jack
val userSet = sparkContext.parallelize(data)
val prod = userSet.cartesian(userSet)
val subsetMembers = prod.collect{case ((name1, set1), (name2,set2)) if  (name1 != name2) && (set2.subsetOf(set1)) && (set1 -- set2).nonEmpty => (name2, set2) }
val superset = userSet.subtract(subsetMembers)    

// lets see the results:
superset.collect()
// Array[(String, scala.collection.immutable.Set[Int])] = Array((olga,Set(1, 2, 3)), (peter,Set(1, 2, 3)), (anne,Set(7)), (jack,Set(5, 4, 1)))