Newbiee Newbiee - 2 months ago 19
Scala Question

Scala : How to do GroupBy sum for String values?

I have RDD[Row] :

|---itemId----|----Country-------|---Type----------|
| 11 | US | Movie |
| 11 | US | TV |
| 101 | France | Movie |


How to do GroupBy itemId so that I can save the result as List of json where each row is separate json object(each row in RDD) :

{"itemId" : 11,
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} },
{"itemId" : 101,
"Country": {"France" :1 },"Type": {"Movie" :1} }


RDD :

I tried :

import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo


val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)


The input is list of jsons where each line is json which I am mapping to the POJO class CountryInfo using MappingUtils which takes care of JSON parsing and conversion:

val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]


def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}


val events = sqlContext.sql( "select itemId EventList")

val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val ├žountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val type = countryInfo.getType()

Row(itemId, country, type)
})


Can some one let me know how can I achieve this ?

Thank You!

Answer

I can't afford the extra time to complete this, but can give you a start.

The idea is that you aggregate the RDD[Row] down into a single Map that represents your JSON structure. Aggregation is a fold that requires two function parameters:

  1. seqOp How to fold a collection of elements into the target type
  2. combOp How to merge two of the target types.

The tricky part comes in combOp while merging, as you need to accumulate the counts of values seen in the seqOp. I have left this as an exercise, as I have a plane to catch! Hopefully someone else can fill in the gaps if you have trouble.

  case class Row(id: Int, country: String, tpe: String)

  def foo: Unit = {

    val rows: RDD[Row] = ???

    def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = {
      acc.get(r.id) match {
        case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1)))
        case Some((countries, types)) =>
          val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1)
          val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1)
          acc.updated(r.id, (countries_, types_))
      }
    }

    val z = Map.empty[Int, (Map[String, Int], Map[String, Int])]

    def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = {
      l.foldLeft(z) { case (acc, (id, (countries, types))) =>
          r.get(id) match {
            case None => acc.updated(id, (countries, types))
            case Some(otherCountries, otherTypes) => 
              // todo - continue by merging countries with otherCountries
              // and types with otherTypes, then update acc
          }
      }
    }

    val summaryMap = rows.aggregate(z) { seqOp, combOp }