Quantad Quantad - 2 months ago 17
Scala Question

Spark Scala: Split each line between multiple RDDs

I have a file on HDFS in the form of:

61,139,75
63,140,77
64,129,82
68,128,56
71,140,47
73,141,38
75,128,59
64,129,61
64,129,80
64,129,99


I create an RDD from it and and zip the elements with their index:

val data = sc.textFile("hdfs://localhost:54310/usrp/sample.txt")
val points = data.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val indexed = points.zipWithIndex()
val indexedData = indexed.map{case (value,index) => (index,value)}


Now I need to create
rdd1
with the index and the first two elements of each line. Then need to create
rdd2
with the index and third element of each row. I am new to Scala, can you please help me with how to do this ?

This does not work since
y
is not of type
Vector
but
org.apache.spark.mllib.linalg.Vector


val rdd1 = indexedData.map{case (x,y) => (x,y.take(2))}


Basically how to get he first two elements of such a vector ?

Thanks.

Answer

You can make use of DenseVector's unapply method to get the underlying Array[Double] in your pattern-matching, and then call take/drop on the Array, re-wrapping it with a Vector:

val rdd1 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.take(2))) }
val rdd2 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.drop(2))) }

As you can see - this means the original DenseVector you created isn't really that useful, so if you're not going to use indexedData anywhere else, it might be better to create indexedData as a RDD[(Long, Array[Double])] in the first place:

val points = data.map(s => s.split(',').map(_.toDouble))
val indexedData: RDD[(Long, Array[Double])] = points.zipWithIndex().map(_.swap)

val rdd1 = indexedData.mapValues(arr => Vectors.dense(arr.take(2)))
val rdd2 = indexedData.mapValues(arr => Vectors.dense(arr.drop(2))) 

Last tip: you probably want to call .cache() on indexedData before scanning it twice to createrdd1 and rdd2 - otherwise the file will be loaded and parsed twice.