user1733690 user1733690 - 4 months ago 100
Scala Question

Can't run LDA on Dataset[(scala.Long, org.apache.spark.mllib.linalg.Vector)] in Spark 2.0

I am following this tutorial video on LDA example and I'm getting the following issue :

<console>:37: error: overloaded method value run with alternatives:
(documents: org.apache.spark.api.java.JavaPairRDD[java.lang.Long,org.apache.spark.mllib.linalg.Vector])org.apache.spark.mllib.clustering.LDAModel <and>
(documents: org.apache.spark.rdd.RDD[(scala.Long, org.apache.spark.mllib.linalg.Vector)])org.apache.spark.mllib.clustering.LDAModel
cannot be applied to (org.apache.spark.sql.Dataset[(scala.Long, org.apache.spark.mllib.linalg.Vector)])
val model = run(lda_countVector)
^


So I want to convert this DF to RDD but it is always assigned as DataSet for me. Can anyone please look into this issue?

// Convert DF to RDD
import org.apache.spark.mllib.linalg.Vector
val lda_countVector = countVectors.map { case Row(id: Long, countVector: Vector) => (id, countVector) }
// import org.apache.spark.mllib.linalg.Vector
// lda_countVector: org.apache.spark.sql.Dataset[(Long, org.apache.spark.mllib.linalg.Vector)] = [_1: bigint, _2: vector]

Answer

Ok, here is the things. There seems to be an issue with the LDA run.

@LostInOverflow 's answer should be correct but here is the temporary fix that I suggest, per example :

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}

val a = Vectors.dense(Array(1.,2.,3.))
val b = Vectors.dense(Array(3.,4.,5.))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF

val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } 

val model = new LDA().setK(3).run(ldaDF) // this will generate the following error :
// <console>:37: error: overloaded method value run with alternatives:
//  (documents: org.apache.spark.api.java.JavaPairRDD[java.lang.Long,org.apache.spark.mllib.linalg.Vector])org.apache.spark.mllib.clustering.LDAModel <and>
//  (documents: org.apache.spark.rdd.RDD[(scala.Long, org.apache.spark.mllib.linalg.Vector)])org.apache.spark.mllib.clustering.LDAModel
// cannot be applied to (org.apache.spark.sql.Dataset[(scala.Long, org.apache.spark.mllib.linalg.Vector)])
//         val model = new LDA().setK(3).run(ldaDF)
//                                           ^

Actually, the error says all, it expects a JavaPairRDD(...) and not and RDD(...) (which is weird), so you can do the following for now :

val model = new LDA().setK(3).run(ldaDF.javaRDD)
// model: org.apache.spark.mllib.clustering.LDAModel = org.apache.spark.mllib.clustering.DistributedLDAModel@116b7837

PS : For another time, please provide the error messages in your question because your question was misleading.