joakimj joakimj - 1 year ago 52
Scala Question

How to aggregate a Spark data frame to get a sparse vector using Scala?

I have a data frame like the one below in Spark, and I want to group it by the

id
column and then for each line in the grouped data I need to create a sparse vector with elements from the
weight
column at indices specified by the
index
column. The length of the sparse vector is known, say 1000 for this example.

Dataframe
df
:

+-----+------+-----+
| id|weight|index|
+-----+------+-----+
|11830| 1| 8|
|11113| 1| 3|
| 1081| 1| 3|
| 2654| 1| 3|
|10633| 1| 3|
|11830| 1| 28|
|11351| 1| 12|
| 2737| 1| 26|
|11113| 3| 2|
| 6590| 1| 2|
+-----+------+-----+


I have read this which is sort of similar of what I want to do, but for a rdd. Does anyone know of a good way to do this for a data frame in Spark using Scala?

My attempt so far is to first collect the weights and indices as lists like this:

val dfWithLists = df
.groupBy("id")
.agg(collect_list("weight") as "weights", collect_list("index") as "indices"))


which looks like:

+-----+---------+----------+
| id| weights| indices|
+-----+---------+----------+
|11830| [1, 1]| [8, 28]|
|11113| [1, 3]| [3, 2]|
| 1081| [1]| [3]|
| 2654| [1]| [3]|
|10633| [1]| [3]|
|11351| [1]| [12]|
| 2737| [1]| [26]|
| 6590| [1]| [2]|
+-----+---------+----------+


Then I define a udf and do something like this:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf

def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))}
val udfToSparseVector = udf(toSparseVector)

val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights"))


but this doesn't seem to work, and it feels like there should be an easier way to do it without needing to collecting the weights and indices to lists first.

I'm pretty new to Spark, Dataframes and Scala, so any help is highly appreciated.

Answer Source

You have to collect them as vectors must be local, single machine: https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector

For creating the sparse vectors you have 2 options, using unordered (index, value) pairs or specifying the indices and values arrays: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors$

If you can get the data into a different format (pivoted), you could also make use of the VectorAssembler: https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

With some small tweaks you can get your approach working:

:paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

val df = Seq((11830,1,8), (11113, 1, 3), (1081, 1,3), (2654, 1, 3), (10633, 1, 3), (11830, 1, 28), (11351, 1, 12), (2737, 1, 26), (11113, 3, 2), (6590, 1, 2)).toDF("id", "weight", "index")

val dfWithFeat = df
  .rdd
  .map(r => (r.getInt(0), (r.getInt(2), r.getInt(1).toDouble)))
  .groupByKey()
  .map(r => LabeledPoint(r._1, Vectors.sparse(1000, r._2.toSeq)))
  .toDS

dfWithFeat.printSchema
dfWithFeat.show(10, false)


// Exiting paste mode, now interpreting.

root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)

+-------+-----------------------+
|label  |features               |
+-------+-----------------------+
|11113.0|(1000,[2,3],[3.0,1.0]) |
|2737.0 |(1000,[26],[1.0])      |
|10633.0|(1000,[3],[1.0])       |
|1081.0 |(1000,[3],[1.0])       |
|6590.0 |(1000,[2],[1.0])       |
|11830.0|(1000,[8,28],[1.0,1.0])|
|2654.0 |(1000,[3],[1.0])       |
|11351.0|(1000,[12],[1.0])      |
+-------+-----------------------+

dfWithFeat: org.apache.spark.sql.Dataset[org.apache.spark.mllib.regression.LabeledPoint] = [label: double, features: vector]
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download