user3279189 user3279189 - 1 month ago 19
Scala Question

Bloating a dataset with spark & scala

Here is my requirement

Input

customer_id status start_date end_date
1 Y 20140101 20140105
2 Y 20140201 20140203


Output

customer_id status date
1 Y 20140101
1 Y 20140102
1 Y 20140103
1 Y 20140104
1 Y 20140105
2 Y 20140201
2 Y 20140202
2 Y 20140202


I'm trying to achieve this with cartesian product in spark and it looks to very inefficient. My dataset is too huge. I'm looking for a better option.

Answer

If I got your idea correctly you can do it this way:

  val conf = new SparkConf().setMaster("local[2]").setAppName("test")
  val sc = new SparkContext(conf)

  case class Input(customerId: Long, status: String, startDate: LocalDate, endDate: LocalDate)
  case class Output(customerId: Long, status: String, date: LocalDate)

  val input: RDD[Input] = sc.parallelize(Seq(
    Input(1, "Y", LocalDate.of(2014, 1, 1), LocalDate.of(2014, 1, 5)),
    Input(2, "Y", LocalDate.of(2014, 1, 1), LocalDate.of(2014, 1, 3))
  ))

  val result: RDD[Output] = input flatMap { input =>
    import input._
    val dates = Stream.iterate(startDate)(_.plusDays(1)).takeWhile(!_.isAfter(endDate))
    dates.map(date => Output(customerId, status, date))
  }

  result.collect().foreach(println)

Output:

Output(1,Y,2014-01-01)
Output(1,Y,2014-01-02)
Output(1,Y,2014-01-03)
Output(1,Y,2014-01-04)
Output(1,Y,2014-01-05)
Output(2,Y,2014-01-01)
Output(2,Y,2014-01-02)
Output(2,Y,2014-01-03)
Comments