Josh Yaxley Josh Yaxley - 9 months ago 46
Scala Question

Cannot pass arrays from MongoDB into Spark Machine Learning functions that require Vectors

My use case:

Read data from a MongoDB collection of the form:

"_id" : ObjectId("582cab1b21650fc72055246d"),
"label" : 167.517838916715,
"features" : [

And pass it to the class to create a model for predictions.

My problem:

The Spark connector reads in "features" as Array[Double]. expects a DataSet with a Label column and a Features column.

The Features column must be of type VectorUDT (so DenseVector or SparseVector will work).

I cannot .map features from Array[Double] to DenseVector because there is no relevant Encoder:

Error:(23, 11) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}

Custom Encoders cannot be defined.

My question:

  • Is there a way I can set the configuration of the Spark connector to
    read in the "features" array as a Dense/SparseVector?

  • Is there any
    other way I can achieve this (without, for example, using an
    intermediary .csv file and loading that using libsvm)?

My code:

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.{Row, SparkSession}

case class DataPoint(label: Double, features: Array[Double])

object LinearRegressionWithMongo {
def main(args: Array[String]) {
val spark = SparkSession
.config("spark.mongodb.input.uri", "mongodb://")

import spark.implicits._

val dataPoints = MongoSpark.load(spark)
.map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}

val splitData = dataPoints.randomSplit(Array(0.7, 0.3), 42)
val training = splitData(0)
val test = splitData(1)

val linearRegression = new LinearRegression()

// Train the model
val startTime = System.nanoTime()
val linearRegressionModel =
val elapsedTime = (System.nanoTime() - startTime) / 1e9
println(s"Training time: $elapsedTime seconds")

// Print the weights and intercept for linear regression.
println(s"Weights: ${linearRegressionModel.coefficients} Intercept: ${linearRegressionModel.intercept}")

val modelEvaluator = new ModelEvaluator()
println("Training data results:")
modelEvaluator.evaluateRegressionModel(linearRegressionModel, training, "label")
println("Test data results:")
modelEvaluator.evaluateRegressionModel(linearRegressionModel, test, "label")


Any help would be ridiculously appreciated!

Answer Source

There is quick fix for this. If data has been loaded into a DataFrame called df which has:

  • id - SQL double.
  • features - SQL array<double>.

like this one

val df = Seq((1.0, Array(2.3, 3.4, 4.5))).toDF("id", "features")

you select columns you need for downstream processing:

val idAndFeatures ="id", "features")

convert to statically typed Dataset:

val tuples =[(Double, Seq[Double])]

map and convert back to Dataset[Row]:

import { case (id, features) => 
  (id, Vectors.dense(features.toArray))
}.toDF("id", "features")