Bindumalini KK Bindumalini KK - 1 year ago 154
Scala Question

Joining Spark dataframes on the key

I have constructed two dataframes.How can we join multiple Spark dataframes ?

For Example : PersonDf, ProfileDf with a common column as "personId"(key). Now how can we have one Dataframe combining PersonDf and ProfileDf?

Answer Source

Alias Approach using scala :

You can use case class to prepare sample dataset ... which is optional for ex: you can get DataFrame from hiveContext.sql as well..

import org.apache.spark.sql.functions.col

case class Person(name: String, age: Int, personid Int)

case class Profile(name: String, personid  int , profileDescription: Int)

val df1 = sqlContext.createDataFrame(Person("Bindu",20,  2) :: Person("Raphel",25, 5) :: Person("Ram",40, 9):: Nil)

val df2 = sqlContext.createDataFrame(Profile("Spark",2,  "SparkSQLMaster") :: Profile("Spark",5, "SparkGuru") :: Profile("Spark",9, "DevHunter"):: Nil)

// you can do alias to refer column name with aliases to  increase readablity

val df_asPerson ="dfperson")
val df_asProfile ="dfprofile")

val joined_df = df_asPerson.join(
    df_asProfile, col("dfperson.personid") === col("dfprofile.personid"), "inner")
    col(""),col("dfperson.age"), col(""), col("dfprofile.profileDescription")).show

sample Temp table approach which I don't like personally...

The reason to use the registerTempTable( tableName ) method for a DataFrame, is so that in addition to being able to use the Spark-provided methods of a DataFrame, you can also issue SQL queries via the sqlContext.sql( sqlQuery ) method, that use that DataFrame as an SQL table. The tableName parameter specifies the table name to use for that DataFrame in the SQL queries.


sqlContext.sql("""SELECT, dfperson.age, dfprofile.profileDescription
                  FROM  dfperson JOIN  dfprofile
                  ON dfperson.personid == dfprofile.personid""")

If you want to know more about joins pls see this nice post : beyond-traditional-join-with-apache-spark

enter image description here

Note : As mentioned by @RaphaelRoth ,

val resultDf = PersonDf.join(ProfileDf,Seq("personId")) is good approach since it doesnt have duplicate columns from both sides if you are using inner join with same table.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download