Josh Yaxley Josh Yaxley - 18 days ago 7
Scala Question

Cannot pass arrays from MongoDB into Spark Machine Learning functions that require Vectors

My use case:



Read data from a MongoDB collection of the form:

{
"_id" : ObjectId("582cab1b21650fc72055246d"),
"label" : 167.517838916715,
"features" : [
10.0964787450654,
218.621137772497,
18.8833848806122,
11.8010251302327,
1.67037687829152,
22.0766170950477,
11.7122322171201,
12.8014773524475,
8.30441804118235,
29.4821268054137
]
}


And pass it to the org.apache.spark.ml.regression.LinearRegression class to create a model for predictions.

My problem:



The Spark connector reads in "features" as Array[Double].

LinearRegression.fit(...) expects a DataSet with a Label column and a Features column.

The Features column must be of type VectorUDT (so DenseVector or SparseVector will work).

I cannot .map features from Array[Double] to DenseVector because there is no relevant Encoder:

Error:(23, 11) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}


Custom Encoders cannot be defined.

My question:




  • Is there a way I can set the configuration of the Spark connector to
    read in the "features" array as a Dense/SparseVector?

  • Is there any
    other way I can achieve this (without, for example, using an
    intermediary .csv file and loading that using libsvm)?



My code:



import com.mongodb.spark.MongoSpark
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.{Row, SparkSession}

case class DataPoint(label: Double, features: Array[Double])

object LinearRegressionWithMongo {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("LinearRegressionWithMongo")
.master("local[4]")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/LinearRegressionTest.DataPoints")
.getOrCreate()

import spark.implicits._

val dataPoints = MongoSpark.load(spark)
.map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}

val splitData = dataPoints.randomSplit(Array(0.7, 0.3), 42)
val training = splitData(0)
val test = splitData(1)

val linearRegression = new LinearRegression()
.setLabelCol("label")
.setFeaturesCol("features")
.setRegParam(0.0)
.setElasticNetParam(0.0)
.setMaxIter(100)
.setTol(1e-6)

// Train the model
val startTime = System.nanoTime()
val linearRegressionModel = linearRegression.fit(training)
val elapsedTime = (System.nanoTime() - startTime) / 1e9
println(s"Training time: $elapsedTime seconds")

// Print the weights and intercept for linear regression.
println(s"Weights: ${linearRegressionModel.coefficients} Intercept: ${linearRegressionModel.intercept}")

val modelEvaluator = new ModelEvaluator()
println("Training data results:")
modelEvaluator.evaluateRegressionModel(linearRegressionModel, training, "label")
println("Test data results:")
modelEvaluator.evaluateRegressionModel(linearRegressionModel, test, "label")

spark.stop()
}
}


Any help would be ridiculously appreciated!

Answer

There is quick fix for this. If data has been loaded into a DataFrame called df which has:

  • id - SQL double.
  • features - SQL array<double>.

like this one

val df = Seq((1.0, Array(2.3, 3.4, 4.5))).toDF("id", "features")

you select columns you need for downstream processing:

val idAndFeatures = df.select("id", "features")

convert to statically typed Dataset:

val tuples = idAndFeatures.as[(Double, Seq[Double])]

map and convert back to Dataset[Row]:

import org.apache.spark.ml.linalg.Vectors

tuples.map { case (id, features) => 
  (id, Vectors.dense(features.toArray))
}.toDF("id", "features")
Comments