Ignacio Alorre Ignacio Alorre - 3 years ago 209
Scala Question

Spark - after a withColumn("newCol", collect_list(...)) select rows with more than one element

I am working with a DataFrame created from this json:

{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "39"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}


Initially the Dataframe looks like this:

+---+----+-------+
|age| id| name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 39|1203| amith|
| 23|1204| javed|
| 25|1205| mendy|
| 24|1206| rob|
| 23|1207| prudvi|
+---+----+-------+


What I need is to group all students with the same age, ordering them depending on their id. This is how I'm approaching this so far:

*Note: I'm pretty sure the are more efficient way than adding a new column using
withColumn("newCol", ..)
to then use a
select("newCol")
, but I don't know how to solve it better

val conf = new SparkConf().setAppName("SimpleApp").set("spark.driver.allowMultipleContexts", "true").setMaster("local[*]")
val sc = new SparkContext(conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val df = sqlContext.read.json("students.json")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("List")


The output I am getting is this:

[WrappedArray([25,1201,satish], [25,1205,mendy])]
[WrappedArray([24,1206,rob])]
[WrappedArray([23,1204,javed])]
[WrappedArray([23,1204,javed], [23,1207,prudvi])]
[WrappedArray([28,1202,krishna])]
[WrappedArray([39,1203,amith])]


Now, How can I filter the rows which have got more than one element? That is, I want that my final dataframe to be:

[WrappedArray([25,1201,satish], [25,1205,mendy])]
[WrappedArray([23,1204,javed], [23,1207,prudvi])]


My best approach so far is:

val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id")))

val filterd = mergedDF.withColumn("count", count("age").over(Window.partitionBy("age"))).filter($"count" > 1).select("newCol")


But I must be missing something, because the result is not the expected one:

[WrappedArray([23,1204,javed], [23,1207,prudvi])]
[WrappedArray([25,1201,satish])]
[WrappedArray([25,1201,satish], [25,1205,mendy])]

Answer Source

You can define an udf to filter your data:

val arrLen = udf {a: Seq[Row] => a.length > 1 }

mergedDF.filter(arrLen(col("newCol"))).show(false)
+---+----+------+-----------------------------------+
|age|id  |name  |newCol                             |
+---+----+------+-----------------------------------+
|23 |1207|prudvi|[[23,1204,javed], [23,1207,prudvi]]|
|25 |1205|mendy |[[25,1201,satish], [25,1205,mendy]]|
+---+----+------+-----------------------------------+
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download