rvp rvp - 2 years ago 143
Scala Question

Split multiple fields or columns of a single row and create multiple rows using Scala

I have a data-frame with 4 fields as mentioned below :

Field1 , Field2 , Field3 , Field4

I have values in the fields as below :

A1 , B1 , C1 , D1
A2 , B2,B3 , C2,C3 , D2,D3
A1 , B4,B5,B6 , C4,C5,C6 , D4,D5,D6

I have to convert it into the below format :

A1 , B1 , C1 , D1
A2 , B2 , C2 , D2
A2 , B3 , C3 , D3
A1 , B4 , C4 , D4
A1 , B5 , C5 , D5
A1 , B6 , C6 , D6

Basically I have to split the comma separated values in multiple columns and form new rows based on the values in the same order.

You can consider all of them as of type String. Can you suggest me a way to do this splitting and forming new rows based on the new values.

I could see already a question similar to this as the below one:

How to flatmap a nested Dataframe in Spark

But this question is different as I have to consider splitting multiple columns in this case and the values should not repeat.

Answer Source

You can convert DataFrame to Dataset[(String, String, String, String)] and flatMap:

import scala.util.Try

val df = Seq(
  ("A1", "B1", "C1", "D1"),
  ("A2", "B2,B3", "C2,C3", "D2,D3"),
  ("A1", "B4,B5,B6", "C4,C5,C6", "D4,D5,D6")
).toDF("x1", "x2", "x3", "x4")

// A simple sequence of expressions which allows us to flatten the results
val exprs = (0 until df.columns.size).map(i => $"value".getItem(i))

df.select($"x1", array($"x2", $"x3", $"x4")).as[(String, Seq[String])].flatMap {
  case (x1, xs) => 
    Try(xs.map(_.split(",")).transpose).map(_.map("x" +: _)).getOrElse(Seq())

// +--------+--------+--------+--------+
// |value[0]|value[1]|value[2]|value[3]|
// +--------+--------+--------+--------+
// |      A1|      B1|      C1|      D1|
// |      A2|      B2|      C2|      D2|
// |      A2|      B3|      C3|      D3|
// |      A1|      B4|      C4|      D4|
// |      A1|      B5|      C5|      D5|
// |      A1|      B6|      C6|      D6|
// +--------+--------+--------+--------+

or use an UDF:

val splitRow = udf((xs: Seq[String]) => 

// Same as before but we exclude the first column
val exprs = (0 until df.columns.size - 1).map(i => $"xs".getItem(i))

  .withColumn("xs", explode(splitRow(array($"x2", $"x3", $"x4"))))
  .select($"x1" +: exprs: _*)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download