jay jay - 7 months ago 212
Python Question

Passing a data frame column and external list to udf under withColumn

I have a spark dataframe with following structure. The bodyText_token has the tokens (processed/set of words). And I have a nested list of defined keywords

|-- id: string (nullable = true)
|-- body: string (nullable = true)
|-- bodyText_token: array (nullable = true)


I needed to check how many tokens fall under each keyword list and add the result as a new column of the existing dataframe.
Eg: if
tokens =["become", "farmer","rally","workers","student"]

the result will be -> [1,2,0]

The following function worked as expected.

def label_maker_topic(tokens,topic_words):
twt_list = []
for i in range(0, len(topic_words)):
count = 0
for tkn in tokens:
if tkn in topic_words[i]:
count += 1

return twt_list

I used udf under withColumn to access the function and I get an error. I think it's about passing an external list to a udf. Is there a way I can pass external list and the datafram column to a udf and add a new column to my dataframe?

topicWord = udf(label_maker_topic,StringType())


The cleanest solution is to pass additional arguments using closure:

def make_topic_word(topic_words):
     return udf(lambda c: label_maker_topic(c, topic_words))

df = sc.parallelize([(["union"], )]).toDF(["tokens"])

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))

This doesn't require any changes in keyword_list or the function you wrap with UDF. You can also use this method to pass an arbitrary object. This can be used to pass for example a list of sets for efficient lookups.

If you want to use your current UDF and pass topic_words directly you'll have to convert it to a column literal first:

from pyspark.sql.functions import array, lit

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()

Depending on your data and requirements there can alternative, more efficient solutions, which don't require UDFs (explode + aggregate + collapse) or lookups (hashing + vector operations).