조상진 조상진 - 3 months ago 21
Scala Question

How to make RDD data in spark

This is my code

import org.apache.spark.SparkContext..
def main(args: Array[String]): Unit = {

val conf = new sparkConf().setMaster("local").setAppname("My app")
val sc = new SparkContext(conf_
val inputfile = "D:/test.txt"
val inputData = sc.textFile(inputFile)

val DupleRawData = inputData.map(_.split("\\<\\>").toList)
.map(s => (s(8),s(18)))
.map(s => (s, 1))
.reduceByKey(_ + _)

val UserShopCount = DupleRawData.groupBy(s => s._1._1)
.map(s => (s._1, s._2.toList.sortBy(z => z._2).reverse))

val ResultSet = UserShopCount.map(s => (s._1, s._2.take(1000).map(z => z._1._2, z._2))))

ResultSet.foreach(println)
//(aaa,List((100,4), (200,4), (300,3), (800,1)))
//(bbb,List((100,6), (400,5), (500,4)))
//(ccc,List((300,7), (400,6), (700,3)))
// here now I reach..
}


and this is the result I'm getting:

(aaa,List((100,4), (200,4), (300,3), (800,1)))
(bbb,List((100,6), (400,5), (500,4)))
(ccc,List((300,7), (400,6), (700,3)))


I want to final result set RDD is // val ResultSet: org.apache.spark.rdd.RDD[(String, List[(String, Int)])]

(aaa, List(200,4), (800,1)) // because bbb and ccc map key exists 100,300 ..
(bbb, List((500,4)) // because aaa and ccc map key exists 100,400 ..
(ccc, List((700,3)) // because aaa and bbb map key exists 300,400 ..
..
..
..


please give me a solution or advice...sincerely

Answer

Here is my attempt:

val data: Seq[(String, List[(Int, Int)])] = Seq(
  ("aaa",List((1,4), (2,4), (3,3), (8,1))),
  ("bbb",List((1,6), (4,5), (5,4))),
  ("ccc",List((3,7), (6,6), (7,3)))
)

val uniqKeys = data.flatMap {
  case (_, v) => {
    v.map(_._1)
  }
} groupBy(identity(_)) filter (_._2.size == 1) 


val result = data.map {
  case (pk, v) => val finalValue = v.filter {
    case (k, _) => !uniqKeys.contains(k)
  }
  (pk, finalValue)
}

Output:

result: Seq[(String, List[(Int, Int)])] = List((aaa,List((1,4), (3,3))), (bbb,List((1,6))), (ccc,List((3,7))))
Comments