Jessi joseph Jessi joseph - 2 years ago 88
Scala Question

alternate way to proceed without list in scala

I have a scala code like this

def avgCalc(buffer: Iterable[Array[String]], list: Array[String]) = {
val currentTimeStamp = list(1).toLong // loads the timestamp column
var sum = 0.0
var count = 0
var check = false
import scala.util.control.Breaks._
breakable {
for (array <- buffer) {
val toCheckTimeStamp = array(1).toLong // timestamp column
if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { // to check the timestamp for 10 seconds difference
sum += array(5).toDouble // RSSI weightage values
count += 1
}

if ((currentTimeStamp - 10L) > toCheckTimeStamp) {

check = true
break

}
}
}
list :+ sum

}


I will call the above function like this

import spark.implicits._
val averageDF =
filterop.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
.sortBy(array => array(1), false) // Sort by timestamp
.groupBy(array => (array(0), array(2))) // group by tag and listner
.mapValues(buffer => {
buffer.map(list => {
avgCalc(buffer, list) // calling the average function
})
})
.flatMap(x => x._2)
.map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble)) // defining the schema through case class
.toDF // converting to data frame


The above code is working fine.But I need to get rid of list.My senior ask me to remove the list,Because list reduces the execution speed.Any suggestions to proceed without list ?
Any help will be appreciated.

Answer Source

The following solution should work I guess, I have tried to avoid passing both iterable and one array.

def avgCalc(buffer: Iterable[Array[String]]) = {
  var finalArray = Array.empty[Array[String]]
  import scala.util.control.Breaks._
  breakable {
    for (outerArray <- buffer) {
      val currentTimeStamp = outerArray(1).toLong
      var sum = 0.0
      var count = 0
      var check = false
      var list = outerArray
      for (array <- buffer) {
        val toCheckTimeStamp = array(1).toLong
        if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) {
          sum += array(5).toDouble
          count += 1
        }
        if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
          check = true
          break
        }
      }
      if (sum != 0.0 && check) list = list :+ (sum / count).toString
      else list = list :+ list(5).toDouble.toString

      finalArray ++= Array(list)
    }
  }
  finalArray
}

and you can call it like

import sqlContext.implicits._
val averageDF =
  filter_op.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
    .sortBy(array => array(1), false)
    .groupBy(array => (array(0), array(2)))
    .mapValues(buffer => {
        avgCalc(buffer)
    })
    .flatMap(x => x._2)
    .map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble))
    .toDF

I hope this is the desired answer

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download