Null-Hypothesis Null-Hypothesis - 1 month ago 30
Scala Question

Stratified sampling in Spark

I have data set which contains user and purchase data. Here is an example, where first element is userId, second is productId, and third indicate boolean.

(2147481832,23355149,1)
(2147481832,973010692,1)
(2147481832,2134870842,1)
(2147481832,541023347,1)
(2147481832,1682206630,1)
(2147481832,1138211459,1)
(2147481832,852202566,1)
(2147481832,201375938,1)
(2147481832,486538879,1)
(2147481832,919187908,1)
...


I want to make sure I only take 80% of each users data and build an RDD while take the rest of the 20% and build a another RDD. Lets call train and test. I would like to stay away from using groupBy to start with since it can create memory problem since data set is large. Whats the best way to do this?

I could do following but this will not give 80% of each user.

val percentData = data.map(x => ((math.random * 100).toInt, x._1. x._2, x._3)
val train = percentData.filter(x => x._1 < 80).values.repartition(10).cache()

Answer

One possibility is in Holden's answer, and this is another one :

You can use the sampleByKeyExact transformation, from the PairRDDFunctions class, which is still experimental for now (Spark 1.4.1)

sampleByKeyExact(boolean withReplacement, scala.collection.Map fractions, long seed) ::Experimental:: Return a subset of this RDD sampled by key (via stratified sampling) containing exactly math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).

And this is how I would do :

Considering the following list :

val list = List((2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1))

I would create an RDD Pair, mapping all the users as keys :

val data = sc.parallelize(list.toSeq).map(x => (x._1,(x._2,x._3)))

I'll set up the fractions for each key as following since you've noticed that the fractions argument in sampleByKeyExact takes a Map of fraction for each key :

val fractions = data.map(_._1).distinct.map(x => (x,0.8)).collectAsMap

What I have done here is actually mapping on the keys to find distinct and then associate each key to a fraction equals ot 0.8 then I collect the whole as a Map.

To sample now, all I have to do is :

import org.apache.spark.rdd.PairRDDFunctions
val sampleData = data.sampleByKeyExact(false,fractions,2L)

or

val sampleData = data.sampleByKeyExact(withReplacement = false,fractions = fractions,seed = 2L)

You can check the count on your keys or data or data sample :

scala > data.count
[...]
res10: Long = 12

scala > sampleData.count
[...]
res11: Long = 10