Mısra Turp Mısra Turp - 4 months ago 12
Scala Question

Using part of the first line of the text file as the key in RDD

I have a dataset that consists of several different folders named "01" to "15" and each folder include files named "00-00.txt" to "23-59.txt" in them (each folder depicting 1 day).

In the files I have the lines as below;
(each entry starting with

is a line, except the first one, it starts with the numbers)

1443650400.010568 !AIVDM,1,1,,B,15NOHL0P00J@uq6>h8Jr6?vN2><,0*4B

I want to have an RDD of key-value pairs, the long value
being the key and lines starting with
being the value. How can I achieve this?


Assuming each file is small enough to be contained in a single RDD record (does not exceed 2GB), you can use SparkContext.wholeTextFiles which reads each file into a single record, and then flatMap these records:

// assuming data/ folder contains folders 00, 01, ..., 15
val result: RDD[(String, String)] = sc.wholeTextFiles("data/*").values.flatMap(file => {
  val lines = file.split("\n")
  val id = lines.head.split(" ").head
  lines.tail.map((id, _))

Alternatively, if that assumption isn't correct (each individual file might be large, i.e. hundreds of MB or more), you'll need to work a bit harder: load all data into a single RDD, add indices to the data, collect a map of "key" per index, and then find the right key for each data row using these indices:

// read files and zip with index to later match each data line to its key
val raw: RDD[(String, Long)] = sc.textFile("data/*").zipWithIndex().cache()

// separate data from ID rows 
val dataRows: RDD[(String, Long)] = raw.filter(_._1.startsWith("!AIVDM"))
val idRows: RDD[(String, Long)] = raw.filter(!_._1.startsWith("!AIVDM"))

// collect a map if Index -> ID
val idForIndex = idRows.map { case (row, index) => (index, row.split(" ").head) }.collectAsMap()

// optimization: if idForIndex is very large - consider broadcasting it or not collecting it and using a join

// map each row to its key by looking up the MAXIMUM index which is < then row index 
// in other words - find the LAST id record BEFORE the row
val result = dataRows.map { case (row, index) =>
  val key = idForIndex.filterKeys(_ < index).maxBy(_._1)._2
  (key, row)