sarthak sarthak - 1 month ago 7
Scala Question

Spark code to find maximum not working



I have an input file of the following form:

twid,usr,tc,txt
1234,abc,24,fgddf
3452,vcf,54,gdgddh
7684,fdsa,32,fgdhs
1234,abc,45,fgddf
3452,vcf,25,gdgddh


My intent is to get for each value in the "twid"column its maximum and minimum value in the "tc" column. For instance, twid of 1234 has maximum and minimum "tc" of 45 and 24 respectively. I have the following code:

val tweet = sc.textFile(inputFile)
val MaxTweetId = tweet.map(x => (x,x.split(",")(2).toInt)).reduceByKey((x,y) => if(x>y) x else y)
val MinTweetId = tweet.map(x => (x,x.split(",")(2).toInt)).reduceByKey((x,y) => if(x>y) y else x)


But I am not getting the correct values for the maximum and the minimum. What am I doing wrong? I am expecting the output for
MaxTweetId.collect
of the form:

1234,abc,45,fgddf
3452,vcf,54,gdgddh
7684,fdsa,32,fgdhs

Answer

You're using x (the entire line) as the key, instead of using just the first "column". You can first transform the RDD into a proper RDD[(Int, Int)] structure and then find Max and Min:

val keyValuePairs = tweet
  .map(_.split(","))
  .map { case Array(twid, _, tc, _) => (twid.toInt, tc.toInt) }

val MaxTweetId = keyValuePairs.reduceByKey(Math.max)
val MinTweetId = keyValuePairs.reduceByKey(Math.min)

EDIT: transformation of "twid" field into String is obviously not that important, can stay String:

val keyValuePairs = tweet
  .map(_.split(","))
  .map { case Array(twid, _, tc, _) => (twid, tc.toInt) }

And in case this syntax is confusing - this gives the same result (for valid records, at least):

val keyValuePairs = tweet
  .map(_.split(","))
  .map(x => (x(0), x(2).toInt))
Comments