Anna Anna - 3 years ago 60
Scala Question

How to build a look up function with multiple keys in spark

I am new to spark, and asked a similar question last week. It compiled but not working. So I really don't know what to do. Here is my problem: I have table A containing 3 columns, like this

-----------
A1 A1 A3
-----------
a b c


and Another Table B like this

------------------------------------
B1 B2 B3 B4 B5 B6 B7 B8 B9
------------------------------------
1 a 3 4 5 b 7 8 c


My logic is: A1 A2 A3 are my key, and it correspond to B2 B6 B9 in table B. I need to build a look up function that takes A1 A2 A3 as key and returns me B8.

This is what I tried last week:

//getting the data in to dataframe
val clsrowRDD = clsfile.map(_.split("\t")).map(p => Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8)))
val clsDataFrame = sqlContext.createDataFrame(clsrowRDD, clsschema)

//mapping the three key with the value
val smallRdd = clsDataFrame.rdd.map{row: Row => (mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))), row.getString(7))}

val lookupMap:Map[mutable.WrappedArray[String], String] = smallRdd.collectAsMap()

//build the look up function
def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))

//call the function
val combinedDF = mstrDataFrame.withColumn("ENTP_CLS_CD",lookup(lookupMap)($"SRC_SYS_CD",$"ORG_ID",$"ORG_CD"))


And this code compiles, but doesn't really return me the results I need. I am thinking it's because I am passing in an array as the key and I don't really have array inside my table. But when I tried change the map type as
Map[(String,String,String),String]
, I don't know how you pass it in the function.

Tons of thanks.

Answer Source

If you are trying to get B8 value for every matching of A1 with B2 and A2 with B6 and A3 with B9, then simple join and select methods should do the trick. Creating a lookup map would create complexity.

As you explained you have to dataframes df1 and df2 as

+---+---+---+
|A1 |A2 |A3 |
+---+---+---+
|a  |b  |c  |
+---+---+---+

+---+---+---+---+---+---+---+---+---+
|B1 |B2 |B3 |B4 |B5 |B6 |B7 |B8 |B9 |
+---+---+---+---+---+---+---+---+---+
|1  |a  |3  |4  |5  |b  |7  |8  |c  |
|1  |a  |3  |4  |5  |b  |7  |8  |e  |
+---+---+---+---+---+---+---+---+---+

Simple join and select can be done

df1.join(df2, $"A1" === $"B2" && $"A2" === $"B6" && $"A3" === $"B9", "inner").select("B8")

which should give you

+---+
|B8 |
+---+
|8  |
+---+

I hope the answer is helpful

Updated

According to what I understood from your question and comments below, you are confused on how to pass array to your lookup udf function. For that you can use array function. I have modified some parts of your almost perfect code to make it work

//mapping the three key with the value
val smallRdd = clsDataFrame.rdd
  .map{row: Row => (mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))), row.getString(7))}

val lookupMap: collection.Map[mutable.WrappedArray[String], String] = smallRdd.collectAsMap()

//build the look up function
def lookup(lookupMap: collection.Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))

//call the function
val combinedDF  = mstrDataFrame.withColumn("ENTP_CLS_CD",lookup(lookupMap)(array($"SRC_SYS_CD",$"ORG_ID",$"ORG_CD")))

You should have

+----------+------+------+-----------+
|SRC_SYS_CD|ORG_ID|ORG_CD|ENTP_CLS_CD|
+----------+------+------+-----------+
|a         |b     |c     |8          |
+----------+------+------+-----------+
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download