Bindumalini KK Bindumalini KK - 1 month ago 20
Scala Question

Joining Spark dataframes on the key

I have constructed two dataframes.How can we join multiple Spark dataframes ?

For Example : PersonDf, ProfileDf with a common column as "personId"(key). Now how can we have one Dataframe combining PersonDf and ProfileDf?

Answer

Alias Approach using scala :

You can use case class to prepare sample dataset ... which is optional for ex: you can get DataFrame from hiveContext.sql as well..

import org.apache.spark.sql.functions.col

case class Person(name: String, age: Int, personid Int)

case class Profile(name: String, personid  int , profileDescription: Int)

val df1 = sqlContext.createDataFrame(Person("Bindu",20,  2) :: Person("Raphel",25, 5) :: Person("Ram",40, 9):: Nil)


val df2 = sqlContext.createDataFrame(Profile("Spark",2,  "SparkSQLMaster") :: Profile("Spark",5, "SparkGuru") :: Profile("Spark",9, "DevHunter"):: Nil)

// you can do alias to refer column name with aliases to  increase readablity

val df_asPerson = df1.as("dfperson")
val df_asProfile = df2.as("dfprofile")


val joined_df = df_asPerson.join(
    df_asProfile, col("dfperson.personid") === col("dfprofile.personid"), "inner")


joined_df.select(
    col("dfperson.name"),col("dfperson.age"), col("dfprofile.name"), col("dfprofile.profileDescription")).show

sample Temp table approach which I don't like personally...

The reason to use the registerTempTable( tableName ) method for a DataFrame, is so that in addition to being able to use the Spark-provided methods of a DataFrame, you can also issue SQL queries via the sqlContext.sql( sqlQuery ) method, that use that DataFrame as an SQL table. The tableName parameter specifies the table name to use for that DataFrame in the SQL queries.

df_asPerson.registerTempTable();
df_asProfile.registerTempTable()

sqlContext.sql("""SELECT dfperson.name, dfperson.age, dfprofile.profileDescription
                  FROM  dfperson JOIN  dfprofile
                  ON dfperson.personid == dfprofile.personid""")

If you want to know more about joins pls see this nice post : beyond-traditional-join-with-apache-spark

enter image description here

Note : As mentioned by @RaphaelRoth ,

val resultDf = PersonDf.join(ProfileDf,Seq("personId")) is good approach since it doesnt have duplicate columns from both sides if you are using inner join with same table.