Puneeth Puneeth - 29 days ago 10
Scala Question

Spliting columns in a Spark dataframe in to new rows [Scala]

I have output from a spark data frame like below:

Amt |id |num |Start_date |Identifier

43.45|19840|A345|[2014-12-26, 2013-12-12]|[232323,45466]|

43.45|19840|A345|[2010-03-16, 2013-16-12]|[34343,45454]|

My requirement is to generate output in below format from the above output

Amt |id |num |Start_date |Identifier

43.45|19840|A345|2014-12-26|232323

43.45|19840|A345|2013-12-12|45466

43.45|19840|A345|2010-03-16|34343

43.45|19840|A345|2013-16-12|45454

Can somebody help me to achieve this.

Answer

Is this the thing you're looking for?

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val sparkSession = ...
import sparkSession.implicits._

val input = sc.parallelize(Seq(
  (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq(232323,45466)),
  (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq(34343,45454))
)).toDF("amt", "id", "num", "start_date", "identifier")

val zipArrays = udf { (dates: Seq[String], identifiers: Seq[Int]) =>
  dates.zip(identifiers)
}

val output = input.select($"amt", $"id", $"num", explode(zipArrays($"start_date", $"identifier")))
  .select($"amt", $"id", $"num", $"col._1".as("start_date"), $"col._2".as("identifier"))

output.show()

Which returns:

+-----+-----+----+----------+----------+
|  amt|   id| num|start_date|identifier|
+-----+-----+----+----------+----------+
|43.45|19840|A345|2014-12-26|    232323|
|43.45|19840|A345|2013-12-12|     45466|
|43.45|19840|A345|2010-03-16|     34343|
|43.45|19840|A345|2013-16-12|     45454|
+-----+-----+----+----------+----------+

EDIT:

Since you would like to have multiple columns that should be zipped, you should try something like this:

val input = sc.parallelize(Seq(
  (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq("232323","45466"), Seq("123", "234")),
  (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq("34343","45454"), Seq("345", "456"))
)).toDF("amt", "id", "num", "start_date", "identifier", "another_column")

val zipArrays = udf { seqs: Seq[Seq[String]] =>
  for(i <- seqs.head.indices) yield seqs.fold(Seq.empty)((accu, seq) => accu :+ seq(i))
}

val columnsToSelect = Seq($"amt", $"id", $"num")
val columnsToZip = Seq($"start_date", $"identifier", $"another_column")
val outputColumns = columnsToSelect ++ columnsToZip.zipWithIndex.map { case (column, index) =>
  $"col".getItem(index).as(column.toString())
}

val output = input.select($"amt", $"id", $"num", explode(zipArrays(array(columnsToZip: _*)))).select(outputColumns: _*)

output.show()

/*
+-----+-----+----+----------+----------+--------------+
|  amt|   id| num|start_date|identifier|another_column|
+-----+-----+----+----------+----------+--------------+
|43.45|19840|A345|2014-12-26|    232323|           123|
|43.45|19840|A345|2013-12-12|     45466|           234|
|43.45|19840|A345|2010-03-16|     34343|           345|
|43.45|19840|A345|2013-16-12|     45454|           456|
+-----+-----+----+----------+----------+--------------+
*/