schoon schoon - 1 month ago 8
Scala Question

How do I display predictions, labels and dataframe column in Spark/Scala?

I am performing a naive Bayes classification in Spark/Scala. It seems to work OK, the code is:

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.feature.StringIndexer

val dfLemma2 = dfLemma.withColumn("racist", 'racist.cast("String"))

val indexer = new StringIndexer().setInputCol("racist").setOutputCol("indexracist")
val indexed = indexer.fit(dfLemma2).transform(dfLemma2)
indexed.show()


val hashingTF = new HashingTF()
.setInputCol("lemma").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(indexed)

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "indexracist").take(3).foreach(println)
val changedTypedf = rescaledData.withColumn("indexracist", 'indexracist.cast("double"))
changedTypedf.show()

// val labeled = changedTypedf.map(row => LabeledPoint(row(0), row.getAs[Vector](4)))

val labeled = changedTypedf.select("indexracist","features").rdd.map(row => LabeledPoint(
row.getAs[Double]("indexracist"),
org.apache.spark.mllib.linalg.Vectors.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"))
))


import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.util.MLUtils
// Split data into training (60%) and test (40%).
val Array(training, test) = labeled.randomSplit(Array(0.6, 0.4))

val model = NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")

val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))

predictionAndLabel.take(100)


This outputs:

res330: Array[(Double, Double)] = Array((0.0,0.0), (0.0,0.0), (0.0,0.0), (0.0,0.0),


which I assume is an array of (prediction, label) pairs.
What I would like to output is these pairs joined to the original text, which was a column called lemma in training dataframe, so something like:

--------------------------------------------------
| Prediction | Label | lemma |
--------------------------------------------------
| 0.0 | 0.0 |[cakes, are, good] |
| 0.0 | 0.0 |[jim, says, hi] |
| 1.0 | 1.0 |[shut, the, dam, door]|
...
--------------------------------------------------


Any pointers are appreciated as my Spark/Scala is weak.

EDIT, The text column is called 'lemma' in 'indexed':

+------+-------------------------------------------------------------------------------------------------------------------+
|racist|lemma |
+------+-------------------------------------------------------------------------------------------------------------------+
|true |[@cllrwood, abbo, @ukip, britainfirst] |
|false |[objectofthemonth, george, lansbury, bust, jussuf, abbo, amp, fascinating, insight, son, jerome] |
|false |[nowplay, one, night, stand, van, brave, @bbraveofficial, bbravesquad, abbo, safe] |
|false |[@mahesh, weet, son, satyamurthy, kante, abbo, chana, better, aaamovie] |

Answer Source

As some answers said : It's recommended that you use ml package instead of mllib package since spark 2.0

After rewriting your code using ml package, the answer of your question will be very straightforward : just selecting the right columns should answer your needs

import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}

val dfLemma2 = dfLemma.withColumn("racist", 'racist.cast("String"))

val indexer = new StringIndexer().setInputCol("racist").setOutputCol("indexracist")

val hashingTF = new HashingTF()
  .setInputCol("lemma")
  .setOutputCol("rawFeatures")
  .setNumFeatures(20)

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")

val naiveBayes =
  new NaiveBayes().setLabelCol("indexracist").setFeaturesCol("features").setModelType("multinomial").setSmoothing(1.0)

val pipeline = new Pipeline().setStages(Array(indexer, hashingTF, idf, naiveBayes))

val Array(training, test) = dfLemma2.randomSplit(Array(0.6, 0.4))

val model = pipeline.fit(training)

val predictionAndLabel = model.transform(test).select('Prediction, 'racist, 'indexracist, 'lemma)

predictionAndLabel.take(100)

Hope it helps, otherwise comment your problems