Andarin Andarin - 1 year ago 176
Scala Question

Register UDF to SqlContext from Scala to use in PySpark

Is it possible to register a UDF (or function) written in Scala to use in PySpark ?

val mytable = sc.parallelize(1 to 2).toDF("spam")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2

In Scala, the following is now possible:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3

I would like to use "UDFaddOne" in PySpark like

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work

Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.

Answer Source

As far as I know PySpark doesn't provide any equivalent of the callUDF function and because of that it is not possible to access registered UDF directly.

The simplest solution here is to use raw SQL expression:

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))

## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")

## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")

This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download