Pedro Bernardo Pedro Bernardo - 1 year ago 266
Python Question

Pyspark DataFrame - How to use variables to make join?

I'm having a bit of trouble to make a join on two Data Frames using Spark Data Frames on python. I have two data frames that I had to change the name of the columns in order to make them unique for each data frame, so later I could tell which column is which. I did this to rename the columns (firstDf and secondDf are Spark DataFrames created using the function createDataFrame):

oldColumns = firstDf.schema.names
newColumns = list(map(lambda x: "{}.{}".format('firstDf', x), oldColumns))
firstDf = firstDf.toDF(*newColumns)

I repeated this for the second DataFrame. Then I tried to join them, using the following code:

from pyspark.sql.functions import *

firstColumn = 'firstDf.firstColumn'
secondColumn = 'secondDf.firstColumn'
joinedDF = firstDf.join(secondDf, col(firstColumn) == col(secondColumn), 'inner')

Using it like this I get the following error:

AnalysisException "cannot resolve 'firstDf.firstColumn' given input columns: [firstDf.firstColumn, ...];"

This was only to illustrate that the column exists in the input columns array.

If I don't rename the DataFrames columns I'm able to join them using this piece of code:

joinedDf = firstDf.join(secondDf, firstDf.firstColumn == secondDf.firstColumn, 'inner')

But this give me a DataFrame with ambiguous column names.

Any ideas on how to approach this?

Answer Source

Generally speaking don't use dots in names. These have special meaning (can be used either to determine the table or to access struct fields) and require some additional work to be correctly recognized.

For equi joins all you need is a column name:

from pyspark.sql.functions import col

firstDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))
secondDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))

column = 'firstColumn'
firstDf.join(secondDf, [column], 'inner')

## DataFrame[firstColumn: bigint, secondColumn: string, secondColumn: string]

For complex cases use table aliases:

firstColumn = 'firstDf.firstColumn'
secondColumn = 'secondDf.firstColumn'

    # After alias prefix resolves to table name
    col(firstColumn) == col(secondColumn),

## DataFrame[firstColumn: bigint, secondColumn: string, firstColumn: bigint, secondColumn: string]

You could also use parent frames directly:

column = 'firstColumn'

firstDf.join(secondDf, firstDf[column] == secondDf[column])
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download