Laurens Laurens - 1 month ago 4
Scala Question

Spark filter and count big RDD multiple times

Say I have an RDD[(String, Int)] like the following example:

(A, 0)
(B, 0)
(C, 1)
(D, 0)
(E, 2)
(F, 1)
(G, 1)
(H, 3)
(I, 2)
(J, 0)
(K, 3)


I want to efficiently print the total amount of records that contain 0, 1, 2, etc.
Since the RDD contains millions of entries I would like to do this as efficient as possible.

The output of this example would return something like:

Number of records containing 0 = 4
Number of records containing 1 = 3
Number of records containing 2 = 2
Number of records containing 3 = 2


Currently I try implementing this by performing a filter on the big RDD and then
count()
for 0, 1, 2,.. separately. I am using Scala.

Is there a more efficient way to do this? I already cache the RDD, but still my program runs out of memory (I have set the driver memory to 5G).

EDIT:
As suggested by Tzach I now use
countByKey
:

rdd.map(_.swap).countByKey()


Can I refine this by changing the string value to a tuple (where the 2nd element is either "m" or "f"), and then obtain the counts per key per unique value of the 2nd element of this tuple?

For example:

(A,m), 0)
(B,f), 0)
(C,m), 1)
(D,m), 0)
(E,f), 2)
(F,f), 1)
(G,m), 1)
(H,m), 3)
(I,f), 2)
(J,f), 0)
(K,m), 3)


Would result in

((0,m), 2)
((0,f), 2)
((1,m), 2)
((1,f), 1)
((2,m), 0)
((2,f), 2)
((3,m), 2)
((3,f), 0)


Thanks in advance!

Answer

You can use the convenient countByKey just for that - just swap the places in the input beforehand to make the numeric value the key:

val rdd = sc.parallelize(Seq(
  ("A", 0), ("B", 0), ("C", 1), ("D", 0), ("E", 2),
  ("F", 1), ("G", 1), ("H", 3), ("I", 2), ("J", 0), ("K", 3)
))

rdd.map(_.swap).countByKey().foreach(println)
// (0,4)
// (1,3)
// (3,2)
// (2,2)

EDIT: countByKey does exactly what it sounds like - so whatever key you want to use, just transform your RDD to have that as the left-side part of the tuple, e.g.:

rdd.map { case ((a, b), i) => ((i, b), a) }.countByKey()

or:

rdd.keyBy { case ((_, b), i) => (i, b) }.countByKey()
Comments