Eugene Cuz - 1 year ago 69
Scala Question

# How to count the number of occurences of an element with scala/spark?

I had a file that contained a list of elements like this

``````00|905000|20160125204123|79644809999||HGMTC|1||22|7905000|56321647569|||34110|I||||||250995210056537|354805064211510||56191|||38704||A|||11|V|81079681404134|5||||SE|||G|144|||||||||||||||Y|b00534589.huawei_anadyr.20151231184912||1|||||79681404134|0|||+@@+1{79098509982}2{2}3{2}5{79644809999}6{0000002A7A5AC635}7{79681404134}|20160125|
``````

Through a series of steps, I managed to convert it to a list of elements like this

``````(902996760100000,CompactBuffer(6, 5, 2, 2, 8, 6, 5, 3))
``````

Where 905000 and 902996760100000 are keys and 6, 5, 2, 2, 8, 6, 5, 3 are values. Values can be numbers from 1 to 8. Are there any ways to count number of occurences of these values using spark, so the result looks like this?

``````(902996760100000, 0_1, 2_2, 1_3, 0_4, 2_5, 2_6, 0_7, 1_8)
``````

I could do it with if else blocks and staff, but that won't be pretty, so I wondered if there are any instrumets I could use in scala/spark.

This is my code.

``````class ScalaJob(sc: SparkContext) {
def run(cdrPath: String) : RDD[(String, Iterable[String])] = {
//pass the file
val fileCdr = sc.textFile(cdrPath);

//find values in every raw cdr
val valuesCdr = fileCdr.map{
dataRaw =>
val p = dataRaw.split("[|]",-1)
(p(1), ScalaJob.processType(ScalaJob.processTime(p(2)) + "_" + p(32)))
}
val x = valuesCdr.groupByKey()
return x
}
``````

Any advice on optimizing it would be appreciated. I'm really new to scala/spark.

First, Scala is a type-safe language and so is Spark's RDD API - so it's highly recommended to use the type system instead of going around it by "encoding" everything into Strings.

So I'll suggest a solution that creates an `RDD[(String, Seq[(Int, Int)])]` (with second item in tuple being a sequence of (ID, count) tuples) and not a `RDD[(String, Iterable[String])]` which seems less useful.

Here's a simple function that counts the occurrences of 1 to 8 in a given `Iterable[Int]`:

``````def countValues(l: Iterable[Int]): Seq[(Int, Int)] = {
(1 to 8).map(i => (i, l.count(_ == i)))
}
``````

You can use `mapValues` with this function (place the function in the object for serializability, like you did with the rest) on an `RDD[(String, Iterable[Int])]` to get the result:

``````valuesCdr.groupByKey().mapValues(ScalaJob.countValues)
``````

The entire solution can then be simplified a bit:

``````class ScalaJob(sc: SparkContext) {
import ScalaJob._

def run(cdrPath: String): RDD[(String, Seq[(Int, Int)])] = {
val valuesCdr = sc.textFile(cdrPath)
.map(_.split("\\|"))
.map(p => (p(1), processType(processTime(p(2)), p(32))))

valuesCdr.groupByKey().mapValues(countValues)
}
}

object ScalaJob {
val dayParts = Map((6 to 11) -> 1, (12 to 18) -> 2, (19 to 23) -> 3, (0 to 5) -> 4)

def processTime(s: String): Int = {
val hour = DateTime.parse(s, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getHourOfDay