J.Choi J.Choi - 2 months ago 9
Scala Question

Transform data using scala in spark

I am trying to transform the input text file into a Key/Value RDD, but the code below doesn't work.(The text file is a tab separated file.) I am really new to Scala and Spark so I would really appreciate your help.

import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source

object shortTwitter {

def main(args: Array[String]): Unit = {
for (line <- Source.fromFile(args(1).txt).getLines()) {
val newLine = line.map(line =>
val p = line.split("\t")
(p(0).toString, p(1).toInt)
)
}

val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text = sc.textFile(args(0))
val counts = text.flatMap(line => line.split("\t"))
}
}

Answer

I'm assuming you want the resulting RDD to have the type RDD[(String, Int)], so -

  • You should use map (which transforms each record into a single new record) and not flatMap (which transform each record into multiple records)
  • You should map the result of the split into a tuple

Altogether:

val counts = text
  .map(line => line.split("\t"))
  .map(arr => (arr(0), arr(1).toInt))

EDIT per clarification in comment: if you're also interested in fixing the non-Spark part (which reads the file sequentially), you have some errors in the for-comprehension syntax, here's the entire thing:

def main(args: Array[String]): Unit = {
  // read the file without Spark (not necessary when using Spark):
  val countsWithoutSpark: Iterator[(String, Int)] = for {
    line <- Source.fromFile(args(1)).getLines()
  } yield {
    val p = line.split("\t")
    (p(0), p(1).toInt)
  }

  // equivalent code using Spark:
  val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
  val sc = new SparkContext(sparkConf)
  val counts: RDD[(String, Int)] = sc.textFile(args(0))
    .map(line => line.split("\t"))
    .map(arr => (arr(0), arr(1).toInt))
}