Lukáš Lalinský Lukáš Lalinský - 3 months ago 55
Scala Question

Spark Dataset and java.sql.Date

Let's say I have a Spark

Dataset
like this:

scala> import java.sql.Date
scala> case class Event(id: Int, date: Date, name: String)
scala> val ds = Seq(Event(1, Date.valueOf("2016-08-01"), "ev1"), Event(2, Date.valueOf("2018-08-02"), "ev2")).toDS


I want to create a new
Dataset
with only the name and date fields. As far as I can see, I can either use
ds.select()
with
TypedColumn
or I can use
ds.select()
with
Column
and then convert the
DataFrame
to
Dataset
.

However, I can't get the former option working with the
Date
type. For example:

scala> ds.select($"name".as[String], $"date".as[Date])
<console>:31: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
ds.select($"name".as[String], $"date".as[Date])
^


The later option works:

scala> ds.select($"name", $"date").as[(String, Date)]
res2: org.apache.spark.sql.Dataset[(String, java.sql.Date)] = [name: string, date: date]


Is there a way to select
Date
fields from
Dataset
without going to
DataFrame
and back?

Answer

Been bashing my head against problems like these for the whole day. I think you can solve your problem with one line:

implicit val e: Encoder[(String, Date)] = org.apache.spark.sql.Encoders.kryo[(String,Date)]

At least that has been working for me.

EDIT

In these cases, the problem is that for most Dataset operations, Spark 2 requires an Encoder that stores schema information (presumably for optimizations). The schema information takes the form of an implicit parameter (and a bunch of Dataset operations have this sort of implicit parameter).

In this case, the OP found the correct schema for java.sql.Date so the following works:

implicit val e = org.apache.spark.sql.Encoders.DATE
Comments