Georg Heiler Georg Heiler - 1 year ago 311
Scala Question

Spark write custom transformer for pipeline

I want to write a custom

for a pipeline in spark 2.0 in scala. So far it is not really clear for me what the
methods should return. Is it correct that they return a
? for copy?

As the
I conclude, that a
calls the
method. Do I understand correctly that
is similar to sk-learns fit?

As my
should join the dataset with a (very small) second dataset I want to store that one in the serialized pipeline as well. How should I store this in the transformer to properly work with the pipelines serialization mechanism?

How would a simple transformer look like which computes the mean for a single column and fills the nan values + persists this value?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {

def transform(df: Dataset[MyClass]): DataFrame = {


override def copy(extra: ParamMap): Transformer = {

override def transformSchema(schema: StructType): StructType = {

Answer Source

transformSchema should return the schema which is expected after applying Transformer. Example:

  • If transfomer adds column of IntegerType, and output column name is foo:

    import org.apache.spark.sql.types._
    override def transformSchema(schema: StructType): StructType = {
       schema.add(StructField("foo", IntegerType))

So if the schema is not changed for the dataset as only a name value is filled for mean imputation I should return the original case class as the schema?

It is not possible in Spark SQL (and MLlib, too) since a Dataset is immutable once created. You can only add or "replace" (which is add followed by drop operations) columns.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download