UrVal UrVal - 1 year ago 130
Scala Question

Class not found when applying udf in Apache Zeppelin

LATER EDIT: It seems the problem is related to the Apache Zeppelin interpreter. I'm using Apache Zeppelin 0.6.0 on Spark 1.6.0. When running the same code in spark-shell (2.0.0) there were no issues.

This might be a bit too specific, but maybe it helps others that get similar errors with UDFs.

What I want is to create a column in a Spark Dataframe based on a different column in that DF and a Seq of strings.
So, create column "urban" and put 1 if the value in column "location" is in the sequence "cities" else put 0.

Tried solving it in several different ways. I get the same error. The final version is based on these posts:
Use of Seq.contains(String)
Create new column with udf. This is what I have now:

val cities = Seq("london", "paris")
df.filter(lower($"location") isin (cities : _*)).count()

Long = 5485947
So I have records with those 2 locations

import org.apache.spark.sql.functions._
val urbanFlag: (String => Int) = (arg: String) => {if (cities.contains(arg)) 1 else 0}
val urbf = udf(urbanFlag)
df.withColumn("urban", urbf(lower($"location"))).show(100)

When I run this I get "Job aborted due to stage failure", the error:


...and a huge stacktrace.
I'd guess there is something about anonymous function but what?

Answer Source

Maybe there's an issue with the way you're defining the UDF? This works for me:

import org.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'location' : 'london'}", "{'location': 'tokyo'}")))

val cities = Seq("london", "paris")
val urbf = udf { city: String => if (cities.contains(city)) 1 else 0 }

data.select($"location", urbf($"location")).show

|  london|            1|
|   tokyo|            0|

Note that I'm defining the UDF directly, i.e. without an intermediate.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download