Georg Heiler Georg Heiler - 14 days ago 8
Scala Question

Spark write custom transformer for pipeline

I want to write a custom

Transformer
for a pipeline in spark 2.0 in scala. So far it is not really clear for me what the
copy
or
transformSchema
methods should return. Is it correct that they return a
null
? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java for copy?

As the
Transformer
extends
PipelineStage
I conclude, that a
fit
calls the
transformSchema
method. Do I understand correctly that
transformSchema
is similar to sk-learns fit?

As my
Transformer
should join the dataset with a (very small) second dataset I want to store that one in the serialized pipeline as well. How should I store this in the transformer to properly work with the pipelines serialization mechanism?

How would a simple transformer look like which computes the mean for a single column and fills the nan values + persists this value?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {

def transform(df: Dataset[MyClass]): DataFrame = {

}

override def copy(extra: ParamMap): Transformer = {
}

override def transformSchema(schema: StructType): StructType = {
schema
}
}

Answer

transformSchema should return the schema which is expected after applying Transformer. Example:

  • If transfomer adds column of IntegerType, and output column name is foo:

    import org.apache.spark.sql.types._
    
    override def transformSchema(schema: StructType): StructType = {
       schema.add(StructField("foo", IntegerType))
    }
    

So if the schema is not changed for the dataset as only a name value is filled for mean imputation I should return the original case class as the schema?

It is not possible in Spark SQL (and MLlib, too) since a Dataset is immutable once created. You can only add or "replace" (which is add followed by drop operations) columns.