Dataminer Dataminer - 1 month ago 13
Scala Question

Transforming Spark Dataframe Column

I am working with Spark dataframes. I have a categorical variable in my dataframe with many levels. I am attempting a simple transformation of this variable - Only pick the top few levels which has greater than n observations (say,1000). Club all other levels into an "Others" category.

I am fairly new to Spark, so I have been struggling to implement this. This is what I have been able to achieve so far:

# Extract all levels having > 1000 observations (df is the dataframe name)
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count"))

# Extract the level names
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect


This gives me an Array which has the level names that I would like to retain. Next, I should define the transformation function which can be applied to the column. This is where I am getting stuck. I believe we need to create a User defined function. This is what I tried:

# Define UDF
val var_transform = udf((x: String) => {
if (level_names contains x) x
else "others"
})

# Apply UDF to the column
val df_new = df.withColumn("Var_new", var_transform($"Col_name"))


However, when I try
df_new.show
it throws a "Task not serializable" exception. What am I doing wrong? Also, is there a better way to do this?

Thanks!

Answer

Here is a solution that would be, in my opinion, better for such a simple transformation: stick to the DataFrame API and trust catalyst and Tungsten to be optimised (e.g. making a broadcast join):

val levels_count = df
  .groupBy($"Col_name".as("new_col_name"))
  .count
  .filter("count >10000")

val df_new = df
  .join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter")
  .drop("Col_name")
  .withColumn("new_col_name",coalesce($"new_col_name", lit("other")))
Comments