Jim Hendricks Jim Hendricks - 1 month ago 58
Scala Question

Error adding VectorAssembler to Spark ML Pipeline

Trying to add VectorAssembler to the GBT pipeline example and get an error the pipeline cannot find the features field. I'm bringing in a sample file instead of a libsvm so I needed to transform the feature set set.

Error:
Exception in thread "main" java.lang.IllegalArgumentException: Field "features" does not exist.

val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("data/training_example.csv")

val sampleDF = df.sample(false,0.05,987897L)

val assembler = new VectorAssembler()
.setInputCols(Array("val1","val2","val3",...,"valN"))
.setOutputCol("features")

val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(sampleDF)

val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(sampleDF)

val Array(trainingData, testData) = sampleDF.randomSplit(Array(0.7, 0.3))

val gbt = new GBTClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setMaxIter(3)
.setMaxDepth(5)

val pipeline = new Pipeline()
.setStages(Array(assembler,labelIndexer,featureIndexer,gbt))

val model = pipeline.fit(trainingData)

val predictions = model.transform(testData)

predictions.show(10)

Answer

Why you're calling fit() in featureIndexer?

If you call fit(sampleDF), VectorIndexer will search for features column in sampleDF, but this dataset doesn't have such column.

You should not call fit () manually, it will be done when calling fit() on pipeline - then pipeline will call all transformator and estimators, so call fit on assembler, then pass the result to fit of labelIndexer and pass previous step result to fit of featureIndexer.

DataFrame that will be used in featureIndexer.fit() called inside Pipeline will have all columns generated by previous transformers.

Remember that if you are using standalone VectorIndexer you must fit model on one data and then use this model to transform new data. If you are using VectorIndexer inside pipeline, then don't use fit() on VectorIndexer - it will be called when calling Pipeline.fit().

In your code sampleDF doesn't have features column, however, during Pipeline fit() this column will be added by assembler stage