Mısra Turp Mısra Turp - 1 month ago 5
Scala Question

Using part of the first line of the text file as the key in RDD

I have a dataset that consists of several different folders named "01" to "15" and each folder include files named "00-00.txt" to "23-59.txt" in them (each folder depicting 1 day).

In the files I have the lines as below;
(each entry starting with

!AIVDM
is a line, except the first one, it starts with the numbers)

1443650400.010568 !AIVDM,1,1,,B,15NOHL0P00J@uq6>h8Jr6?vN2><,0*4B
!AIVDM,1,1,,A,4022051uvOFD>RG7kDCm1iW0088i,0*23
!AIVDM,1,1,,A,23aIhd@P1@PHRwPM<U@`OvN2><,0*4C
!AIVDM,1,1,,A,13n1mSgP00Pgq3TQpibh0?vL2><,0*74
!AIVDM,1,1,,B,177nPmw002:<Tn<gk1toGL60><,0*2B
!AIVDM,1,1,,B,139eu9gP00PugK:N2BOP0?vL2><,0*77
!AIVDM,1,1,,A,13bg8N0P000E2<BN15IKUOvN2><,0*34
!AIVDM,1,1,,B,14bL20003ReKodINRret28P0><,0*16
!AIVDM,1,1,,B,15SkVl001EPhf?VQ5SUTaCnH0><,0*00
!AIVDM,1,1,,A,14eG;ihP00G=4CvL=7qJmOvN0><,0*25
!AIVDM,1,1,,A,14eHMQ@000G<cKrL=6nJ9QfN2><,0*30


I want to have an RDD of key-value pairs, the long value
1443650400.010568
being the key and lines starting with
!AIVDM...
being the value. How can I achieve this?

Answer

Assuming each file is small enough to be contained in a single RDD record (does not exceed 2GB), you can use SparkContext.wholeTextFiles which reads each file into a single record, and then flatMap these records:

// assuming data/ folder contains folders 00, 01, ..., 15
val result: RDD[(String, String)] = sc.wholeTextFiles("data/*").values.flatMap(file => {
  val lines = file.split("\n")
  val id = lines.head.split(" ").head
  lines.tail.map((id, _))
})

Alternatively, if that assumption isn't correct (each individual file might be large, i.e. hundreds of MB or more), you'll need to work a bit harder: load all data into a single RDD, add indices to the data, collect a map of "key" per index, and then find the right key for each data row using these indices:

// read files and zip with index to later match each data line to its key
val raw: RDD[(String, Long)] = sc.textFile("data/*").zipWithIndex().cache()

// separate data from ID rows 
val dataRows: RDD[(String, Long)] = raw.filter(_._1.startsWith("!AIVDM"))
val idRows: RDD[(String, Long)] = raw.filter(!_._1.startsWith("!AIVDM"))

// collect a map if Index -> ID
val idForIndex = idRows.map { case (row, index) => (index, row.split(" ").head) }.collectAsMap()

// optimization: if idForIndex is very large - consider broadcasting it or not collecting it and using a join

// map each row to its key by looking up the MAXIMUM index which is < then row index 
// in other words - find the LAST id record BEFORE the row
val result = dataRows.map { case (row, index) =>
  val key = idForIndex.filterKeys(_ < index).maxBy(_._1)._2
  (key, row)
}