bourbaki4481472 bourbaki4481472 - 9 months ago 31
Scala Question

Is it possible to make a generic training pipeline for Random Forest in Spark ML?

I am just starting out with Spark and Spark ML and I am finding it much more difficult than Python and sklearn.

The development time is much greater so I am wondering if it's feasible to make a generic pipeline that works on any (sufficiently small) dataset and trains a Random Forest Classifier. Ideally, I would create a function like

def trainClassifier(df: DataFrame, labelColumn: String) {
...
}


A lot of the development time in Spark is spent on encoding columns to numerical columns and then forming a vector out of the features so that Spark ML's Random Forest can actually work with it. So one ends up writing lines like

val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)

val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)


So my question is more of a design question (please direct me to a different site, if appropriate) about how to write a generic training function for classification that would work with any
DataFrame
, but it is also a question about Spark in the sense that I am asking whether or not this sort of thing is feasible within Spark (so this is kind of an API question and so it's a better fit for stackoverflow)?

EDIT: What I mean is that I do not what to specify the columns and manually convert the columns for every new dataframe. I want a function
trainClassifier
which will take in a variety of dataframes with different columns and different column types. Something that iterates over all columns except the labelColumn and compiles those together into a feature vector that the classifier can use.

Answer Source

You can create custom Pipeline:

val start = "category"; // can be parameter of method or function
val indexer = new StringIndexer()
               .setInputCol(start )
               .setOutputCol(start + "Index")
               .fit(df)

val encoder = new OneHotEncoder()
               .setInputCol(encoder.outputCol)
               .setOutputCol(start  + "encoded") 

Those steps can be in function that returns Array[Stage] - Array(indexer, encoder). Now you can write as here some function to concat all arrays and create Pipeline:

val randomForest = ... 

val pipeline = new Pipeline()
    .setStages(allStepsArray(indexer , encoder , randomForest))

Then you can call fit on Pipeline or even build CrossValidator like in link:

val model = pipeline.fit(testData)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download