toofrellik toofrellik - 8 days ago 8
Scala Question

Scala - append RDD to itself

for (fordate <- 2 to 30)
{
val DataRDD = sc.textFile("s3n://mypath" + fordate + "/*")
val a = 1
val c= fordate-1
for (b <- a to c)
{
val cumilativeRDD1 = sc.textFile("s3n://mypath/" + b + "/*")
val cumilativeRDD : org.apache.spark.rdd.RDD[String] = sc.union(cumilativeRDD1 ,cumilativeRDD)
if(b==c) {
val IncrementalDEviceIDs = DataRDD.subtract(cumilativeRDD)
val countofIDs = IncrementalDEviceIDs.distinct().count()
println("201611" + fordate + " " + countofIDs)
}
}
}


i have a data set where i get deviceIDs on daily basis. i need to figure out the incremental count per day but when i join
cumilativeRDD
to itself it saysthrows following error:


forward reference extends over definition of value cumilativeRDD


how can i overcome this.

Answer

The problem is this line:

val cumilativeRDD : org.apache.spark.rdd.RDD[String]  = sc.union(cumilativeRDD1 ,cumilativeRDD)

You're using cumilativeRDD before defining it.

You have to init cumilativeRDD in the first run and then you can you use it in following runs:

 var cumilativeRDD: Option[org.apache.spark.rdd.RDD[String]] = None
 for (fordate <- 2 to 30) {
    val DataRDD = sc.textFile("s3n://mypath" + fordate + "/*")
    val c = fordate - 1
    for (b <- 1 to c) {
      val cumilativeRDD1 = sc.textFile("s3n://mypath/" + b + "/*")
      if (cumilativeRDD.isEmpty) cumilativeRDD = Some(cumilativeRDD1)
      else cumilativeRDD = Some(sc.union(cumilativeRDD1, cumilativeRDD.get))

      if (b == c) {
        val IncrementalDEviceIDs = DataRDD.subtract(cumilativeRDD.get)
        val countofIDs = IncrementalDEviceIDs.distinct().count()
        println("201611" + fordate + "  " + countofIDs)
      }
    }
  }