CnewbieWannabePro CnewbieWannabePro -4 years ago 221
Scala Question

RDD/Scala Get one column from RDD

I have an

RDD[Log]
file with various fields (
username,content,date,bytes
) and I want to find different things for each field/column.

For example, I want to get the min/max and average bytes found in the RDD. When i do:

val q1 = cleanRdd.filter(x => x.bytes != 0)


I get the full lines of the RDD with bytes != 0. But how can I actually sum them, calculate the avg, find the min/max etc? How can I take only one column from my RDD and apply transformations on it?

EDIT: Prasad told me about changing the type to dataframe, he gave no instructions on how to do so though, and I cant find a solid answer on the site. Any help would be great.

EDIT: LOG class:

case class Log (username: String, date: String, status: Int, content: Int)


using a cleanRdd.take(5).foreach(println) gives something like this

Log(199.72.81.55 ,01/Jul/1995:00:00:01 -0400,200,6245)
Log(unicomp6.unicomp.net ,01/Jul/1995:00:00:06 -0400,200,3985)
Log(199.120.110.21 ,01/Jul/1995:00:00:09 -0400,200,4085)
Log(burger.letters.com ,01/Jul/1995:00:00:11 -0400,304,0)
Log(199.120.110.21 ,01/Jul/1995:00:00:11 -0400,200,4179)

Answer Source

Well... you have a lot of questions.

So... you have the following abstraction of a Log

case class Log (username: String, date: String, status: Int, content: Int, byte: Int)

Que - How can I take only one column from my RDD.

Ans - You have a map function with the RDD's. So for an RDD[A], map takes a map/transform function of type A => B to transform it into a RDD[B].

val logRdd: RDD[Log] = ...

val byteRdd = logRdd
  .filter(l => l.bytes != 0)
  .map(l => l.byte)

Que - how can I actually sum them ?

Ans - You can do it by using reduce / fold / aggregate.

val sum = byteRdd.reduce((acc, b) => acc + b)

val sum = byteRdd.fold(0)((acc, b) => acc + b)

val sum = byteRdd.aggregate(0)(
  (acc, b) => acc + b,
  (acc1, acc2) => acc1 + acc2
)

Note :: An important thing to notice here is that a sum of Int can grow bigger than what an Int can handle. So in most real life cases we should use at least a Long as our accumulator instead of an Int, which actually removes reduce and fold as options. And we will be left with an aggregate only.

val sum = byteRdd.aggregate(0l)(
  (acc, b) => acc + b,
  (acc1, acc2) => acc1 + acc2
)

Now if you have to calculate multiple things like min, max, avg then I will suggest that you calculate them in a single aggregate instead of multiple like this,

// (count, sum, min, max)
val accInit = (0, 0, Int.MaxValue, Int.MinValue)

val (count, sum, min, max) = byteRdd.aggregate(accInit)(
  { case ((count, sum, min, max), b) => 
      (count + 1, sum + b, Math.min(min, b), Math.max(max, b)) },
  { case ((count1, sum1, min1, max1), (count2, sum2, min2, max2)) => 
      (count1 + count2, sum1 + sum2, Math.min(min1, min2), Math.max(max1, max2)) }
})

val avg = sum.toDouble / count
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download