Active_user Active_user - 9 months ago 40
Python Question

Compare column names in two data frames pyspark

I have two data frames in pyspark

df
and
data
. The schema are like below

>>> df.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- nation: string (nullable = true)
|-- Date: timestamp (nullable = false)
|-- ZipCode: integer (nullable = true)
|-- car: string (nullable = true)
|-- van: string (nullable = true)

>>> data.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- nation: string (nullable = true)
|-- date: string (nullable = true)
|-- zipcode: integer (nullable = true)


Now I want to add columns car and van to my
data
data frame by comparing both the schema.

I would also want to compares two data frames if the columns are same do nothing, but if the columns are different then add the columns to the data frame that doesn't have the columns.

How can we achieve that in pyspark.

FYI I am using spark 1.6


once the columns are added to the data frame. The values for those columns in the newly added data frame Should be null.

for example here we are adding columns to
data
data frame so the columns car and van in data data frame should contain null values but the same columns in df data frame should have their original values

what happens if there are more than 2 new columns to be added

Answer Source

As the schema is not but StructType consisting of list of StructFields, we can retrieve the fields list, to compare and find the missing columns,

df_schema = df.schema.fields
data_schema = data.schema.fields
df_names = [x.name.lower() for x in df_scehma]
data_names = [x.name.lower() for x in data_schema]
if df_schema <> data_schema:
    col_diff = set(df_names) ^ set(data_names)      
    col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if ((x[0] is not None and x[0].name.lower() in col_diff) or x[1].name.lower() in col_diff)]
     for i in col_list:
        if i[0] in df_names:
            data = data.withColumn("%s"%i[0],lit(None).cast(i[1]))
        else:
            df = df.withColumn("%s"%i[0],lit(None).cast(i[1]))
else:
    print "Nothing to do"

You have mentioned to add the column if there is no null values, but your schema diference are nullable columns, so have not used that check. If you need it, then add check for nullable as below,

col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if (x[0].name.lower() in col_diff or x[1].name.lower() in col_diff) and not x.nullable]

Please check the documentation for more about StructType and StructFields, https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.StructType