Eugene Cuz Eugene Cuz - 1 month ago 8
Scala Question

How to count the number of occurences of an element with scala/spark?

I had a file that contained a list of elements like this

00|905000|20160125204123|79644809999||HGMTC|1||22|7905000|56321647569|||34110|I||||||250995210056537|354805064211510||56191|||38704||A|||11|V|81079681404134|5||||SE|||G|144|||||||||||||||Y|b00534589.huawei_anadyr.20151231184912||1|||||79681404134|0|||+@@+1{79098509982}2{2}3{2}5{79644809999}6{0000002A7A5AC635}7{79681404134}|20160125|


Through a series of steps, I managed to convert it to a list of elements like this

(902996760100000,CompactBuffer(6, 5, 2, 2, 8, 6, 5, 3))


Where 905000 and 902996760100000 are keys and 6, 5, 2, 2, 8, 6, 5, 3 are values. Values can be numbers from 1 to 8. Are there any ways to count number of occurences of these values using spark, so the result looks like this?

(902996760100000, 0_1, 2_2, 1_3, 0_4, 2_5, 2_6, 0_7, 1_8)


I could do it with if else blocks and staff, but that won't be pretty, so I wondered if there are any instrumets I could use in scala/spark.

This is my code.

class ScalaJob(sc: SparkContext) {
def run(cdrPath: String) : RDD[(String, Iterable[String])] = {
//pass the file
val fileCdr = sc.textFile(cdrPath);

//find values in every raw cdr
val valuesCdr = fileCdr.map{
dataRaw =>
val p = dataRaw.split("[|]",-1)
(p(1), ScalaJob.processType(ScalaJob.processTime(p(2)) + "_" + p(32)))
}
val x = valuesCdr.groupByKey()
return x
}


Any advice on optimizing it would be appreciated. I'm really new to scala/spark.

Answer

First, Scala is a type-safe language and so is Spark's RDD API - so it's highly recommended to use the type system instead of going around it by "encoding" everything into Strings.

So I'll suggest a solution that creates an RDD[(String, Seq[(Int, Int)])] (with second item in tuple being a sequence of (ID, count) tuples) and not a RDD[(String, Iterable[String])] which seems less useful.

Here's a simple function that counts the occurrences of 1 to 8 in a given Iterable[Int]:

def countValues(l: Iterable[Int]): Seq[(Int, Int)] = {
  (1 to 8).map(i => (i, l.count(_ == i)))
}

You can use mapValues with this function (place the function in the object for serializability, like you did with the rest) on an RDD[(String, Iterable[Int])] to get the result:

valuesCdr.groupByKey().mapValues(ScalaJob.countValues)

The entire solution can then be simplified a bit:

class ScalaJob(sc: SparkContext) {
  import ScalaJob._

  def run(cdrPath: String): RDD[(String, Seq[(Int, Int)])] = {
    val valuesCdr = sc.textFile(cdrPath)
      .map(_.split("\\|"))
      .map(p => (p(1), processType(processTime(p(2)), p(32))))

    valuesCdr.groupByKey().mapValues(countValues)
  }  
}

object ScalaJob {
  val dayParts = Map((6 to 11) -> 1, (12 to 18) -> 2, (19 to 23) -> 3, (0 to 5) -> 4)

  def processTime(s: String): Int = {
    val hour = DateTime.parse(s, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getHourOfDay
    dayParts.filterKeys(_.contains(hour)).values.head
  }

  def processType(dayPart: Int, s: String): Int = s match {
    case "S" => 2 * dayPart - 1
    case "V" => 2 * dayPart
  }

  def countValues(l: Iterable[Int]): Seq[(Int, Int)] = {
    (1 to 8).map(i => (i, l.count(_ == i)))
  }
}