João_testeSW João_testeSW - 1 month ago 12
Scala Question

Get Cluster_ID and the rest of table using Spark MLlib KMeans

I have this dataset (I'm putting some a few rows):

11.97,1355,401
3.49,25579,12908
9.29,129186,10882
28.73,10153,22356
3.69,22872,9798
13.49,160371,2911
24.36,106764,867
3.99,163670,16397
19.64,132547,401


And I'm trying to assign all this rows to 4 clusters using K-Means. For that I'm using the code that I see in this post: Spark MLLib Kmeans from dataframe, and back again

val data = sc.textFile("/user/cloudera/TESTE1")
val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 4, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
val idCluster = idClusterRDD.toDF("purchase","id","product","cluster")


I'm getting this outputs:

scala> import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> val data = sc.textFile("/user/cloudera/TESTE")
data: org.apache.spark.rdd.RDD[String] = /user/cloudera/TESTE MapPartitionsRDD[7] at textFile at <console>:29

scala> val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
idPointRDD: org.apache.spark.rdd.RDD[(Char, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[8] at map at <console>:31


But when I run it I'm getting the following error:

java.lang.UnsupportedOperationException: Schema for type Char is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715)


How can I solve this problem?

Many thanks!

Answer

Here is the thing. You are actually reading a CSV of values into an RDD of String and not converting it properly to numeric values. Instead since a string is a collection of character when you call upon s(0) per example this actually works converts the Char value to an integer or a double but it's not what you are actually looking for.

You need to split your val data : RDD[String]

 val data : RDD[String] = ??? 
 val idPointRDD = data.map {
     s => 
      s.split(",") match { 
      case Array(x,y,z) => Vectors.dense(x.toDouble, Integer.parseInt(y).toDouble,Integer.parseInt(z).toDouble)
      }
 }.cache()

This should work for you !

Comments