sandip sandip - 3 months ago 8
Scala Question

read individual elements of a tuple from a map((tuple),(tuple)) in scala

The generated output of reducebykey is an ShuffledRDD with key-value both as array of multiple field. I need to extract all the fields and write to a hive table.

Below is the code which I was trying:

sqlContext.sql(s"select SUBS_CIRCLE_ID,SUBS_MSISDN,EVENT_START_DT,RMNG_NW_OP_KEY, ACCESS_TYPE FROM FACT.FCT_MEDIATED_USAGE_DATA")
val USAGE_DATA_Reduce = USAGE_DATA.map{ USAGE_DATA => ((USAGE_DATA.getShort(0), USAGE_DATA.getString(1),USAGE_DATA.getString(2)),
(USAGE_DATA.getInt(3), USAGE_DATA.getInt(4)))}.reduceByKey((x, y) => (math.min(x._1, y._1), math.max(x._2,y._2)))


The final output what I am expecting is all the five fields as:
SUBS_CIRCLE_ID,SUBS_MSISDN,EVENT_START_DT, MINVAL, MAXVAL

So that it can be directly inserted to hive table

Answer

If you mean:

Given a RDD[(TupleN, TupleM)], how do I map each record's elements of both key and value tuples into a single concatenated string?

Here's a simplified version, you should be able extrapolate this to solve your problem:

val keyValueRdd = sc.parallelize(Seq(
  (1, "key1") -> (10, "value1", "A"),
  (2, "key2") -> (20, "value2", "B"),
  (3, "key3") -> (30, "value3", "C")
))

val asStrings: RDD[String] = keyValueRdd.map {
  case ((k1, k2), (v1, v2, v3)) => List(k1, k2, v1, v2, v3).mkString(",")
}

asStrings.foreach(println)
// prints:
// 3,key3,30,value3,C
// 2,key2,20,value2,B
// 1,key1,10,value1,A