BigDataLearner BigDataLearner - 3 months ago 9
Scala Question

Spark: how to get top 5 rec using spark

I am converting on of the MR job in Spark, and stuck at where I need to collect data which is not in RDD, but in scala tree map. Use case is to find top 5 countries by sum of bars(field 8) and strips(field 9) in flag. so I get the extracting data from source and saving it in tree map.

Please advice solution or new approach to solve this problem.

Sample Input:

UK,3,4,245,56,1,1,0,0,3,1,0,1,0,1,0,0,red,0,1,1,0,0,0,0,0,0,0,white,red
Uruguay,2,3,178,3,2,0,0,9,3,0,0,1,1,1,0,0,white,0,0,0,1,1,0,0,0,0,0,white,white
US-Virgin-Isles,1,4,0,0,1,1,0,0,6,1,1,1,1,1,0,0,white,0,0,0,0,0,0,0,1,1,1,white,white
USA,1,4,9363,231,1,1,0,13,3,1,0,1,0,1,0,0,white,0,0,0,1,50,0,0,0,0,0,blue,red


MR map/reduce:

public static class StripeBarMapper extends Mapper<Object, Text, IntWritable, Text> {

private TreeMap<IntWritable, Text> Top5 = new TreeMap<IntWritable,Text>();

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String [] flags = value.toString().split(",");

Top5.put(new IntWritable(Integer.parseInt(flags[7]) + Integer.parseInt(flags[8])), new Text(flags[0]));

if(Top5.size() > 5){
Top5.remove(Top5.firstKey());
}
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
for ( IntWritable count : Top5.keySet()){
context.write(count, Top5.get(count));
}
}
}

public static class StripeBarReducer extends Reducer<IntWritable, Text, Text, Text> {

public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

System.out.print("reducer received: " + key.toString() + " --> ");

for(Text country: values)
context.write(new Text(country), new Text(key));
}
}


Spark Job

val inputFile = "Data\\country\\flag.data"

val conf = new SparkConf().setAppName("Top 5 Countries by Sum of bars and strips in flag").setMaster("local")
val sc = new SparkContext(conf)

val txtFileLines = sc.textFile(inputFile).cache()

var tm = TreeMap(1 -> "one")

val Tops = txtFileLines.map(_.split(","))
.map{ s =>
if(tm.size > 5){
tm -= tm.firstKey
}
tm += ((s(7).toInt + s(8).toInt) -> s(0))
}
//.sortBy( x => x.toString(), ascending = false, 1).saveAsTextFile("output\\country\\byStripsBar")
//.reduce(tm.keys.foreach { x => ($x._1, $x._2) })


Solution:
I guess following same approach as mapreduce job was not appropriate. I solved the problem using sortBy:

val inputFile = "Data\\country\\flag.data"

val conf = new SparkConf().setAppName("Top 5 Countries by Sum of bars and strips in flag").setMaster("local")

val sc = new SparkContext(conf)

val txtFileLines = sc.textFile(inputFile).cache()

val Strips = txtFileLines.map(_.split(","))
.map(line => (line(0) + "," + (line(7).toInt + line(8).toInt)))
.sortBy(x => x.split(",")(1).trim().toInt, ascending=false)
.take(5)
//.saveAsTextFile("output\\country\\byStripsBar")

Strips.foreach { line => println(line) }

Answer

I solved the problem using sortBy and take():

val inputFile = "Data\\country\\flag.data"

val conf = new SparkConf().setAppName("Top 5 Countries by Sum of bars and strips in flag").setMaster("local")

val sc = new SparkContext(conf)

val txtFileLines = sc.textFile(inputFile).cache()

val Strips =  txtFileLines.map(_.split(","))
                         .map(line => (line(0) + "," + (line(7).toInt + line(8).toInt)))
                         .sortBy(x => x.split(",")(1).trim().toInt, ascending=false)
                         .take(5)

Strips.foreach { line => println(line) }

sc.parallelize(Strips, 1).saveAsTextFile("output\\country\\byStripsBar")
Comments