Siva Siva - 1 year ago 217
Scala Question

Spark, Scala : How to Subtract the values in the RDD pairs based on their key?

I have a couple of RDDs of type :RDD[(String, Int)]. I'd like to subtract the integer values based upon the keys.

Here's an example: If the input RDDs are

Valid_ record = (TcustomerTDL_2016266,16)
deleted_record = (TcustomerTDL_2016266,8)

as the key values are same, integer values have to be subtracted. I tried using "SubtractByKey" but it doesn't seem to work. So the expected result is (TcustomerTDL_2016266,8) which is 16-8 = 8.`

I used the following code:

val changes_total = valid_record.subtractByKey(deleted_record).

Let me know if there is an alternative way to do this or if this is incorrect.

Here is the code:

val Conf = new SparkConf().setAppName("Module").setMaster("local")
val sc = new SparkContext(Conf)
val incoming_file =sc.wholeTextFiles("D:/Users/Documents/siva_hourly") //changed code
val output ={case(k,v) => (k.split("/")(6),v.split("\\r?\\n"))}
val change_type ={case (k,v) => (k,( x => x.split("\001")(2))))} //changed code
val change_delete_count ={case(k,v) => (k,(v.filter{ x => x == "D" }).length)}
val change_record_foreach4 ={case(k,v) => (k.split("_"),v)}
val change_record_foreach3 ={case(k,v)=>(k(0)+'_'+k(1),v)}
val change_valid_count ={case(k,v) => (k,(v.filter{ x => x =="A" || x == "I"}).length)}
val change_record_foreach ={case(k,v) => (k.split("_"),v)}
val change_record_foreach1 ={case(k,v)=>(k(0)+'_'+k(1),v)}
val valid_record = change_record_foreach1.reduceByKey((x, y) => x + y)
val deleted_record = change_record_foreach3.reduceByKey((x, y) => x + y)
val changes_total = valid_record.subtractByKey(deleted_record)

Answer Source

This is not the correct usage of subtractByKey

Here is the example of how subtractByKey works

Let's suppose you have two RDDs as below.

two pair RDDs (rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)})


Result is as below

{(1, 2)}

You can do like this

val joinRDD = Valid_ record .join(deleted_record)
val resultRDD = joinRDD.mapValues(x => x._1 - x._2)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download