Andarin Andarin - 3 months ago 37
Scala Question

Register UDF to SqlContext from Scala to use in PySpark

Is it possible to register a UDF (or function) written in Scala to use in PySpark ?
E.g.:

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2


In Scala, the following is now possible:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3


I would like to use "UDFaddOne" in PySpark like

%pyspark
mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work


Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.

Answer

As far as I know PySpark doesn't provide any equivalent of the callUDF function and because of that it is not possible to access registered UDF directly.

The simplest solution here is to use raw SQL expression:

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))

## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")

## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")

This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?