prabhu prabhu - 9 days ago 5
Scala Question

Group cartesian co-ordinates to cells in apache spark

I have a csv file that contains a million rows of dateTime, longitude, lattitude values. As of now, I have extracted the minimum lattitude and longitude values. My aim is to form cells of size 0.01*0.01 on this dataset. I would like to do the following in scala.

create a 2d array: grid[rows][cols]

for r <- 0 until rows
for c <- 0 until cols
if( (tuple(longitude) <= minimumlongitude + (0.01 * r) AND tuple(longitude) <= minimumlongitude + (0.01 * r+1) ) AND ((tuple(lattitude) <= minimumlattitude + (0.01 * r) AND tuple(lattitude) <= minimumlattitude + (0.01 * r+1) ))
then
grid[r][c].append(tuple)


Basically, determine the point (x,y) belongs to which cell in the grid and group all those points to represent the particular cell.

EDIT 1: the sample input is as follows:

(10/23/2015, -73.1111112212, 45.2)
(10/23/2015, -73.1555555121, 45.20005011)
(10/23/2015, -73.1112232113, 45.20000051)
(10/20/2015, -73.1121243113, 45.20100011)
(10/20/2015, -73.1234123412, 45.20004011)
(10/23/2015, -73.1521233123, 45.20000211)
(10/23/2015, -73.1531231233, 45.20000011)
... upto about 10 million rows.


what i have done is, i have extracted the min longitude and minimum lattitude and max longitude and max lattitude. so, this forms the outer big rectangle. Now, I want to divide this rectangle into 0.01*0.01 sized cells. for example, the first cell will be (minlattitude, minlongitude), (minlattitude + 0.01, minlongitude+0.01). then, I would like to map each row of data to which cell it belongs to based on the condition that

rowOfData.longitude >= cell.minLongitude && rowOfData.longitude < cell.minLongitude+0.01 &&

rowOfData.lattidude >= cell.minLattitude && rowOfData.lattidude < cell.minLattitude+0.01

Could some one please tell me how to go about it? And also, more efficiently, due to the size of the data set. Any help is greatly appreciated

Answer

Group a dataset of coordinates in cells of a given resolution, starting at the minimal coordinates of the bounding box for the set:

val resolution = 0.01
val sampleData = "/.../sampleGeoCoordinatesWithTs.csv"
val data = sparkSession.read.option("inferSchema", "true").csv(sampleData).toDF("date","lat","long")

import  org.apache.spark.sql.Row
val Row(minLat:Double, minLong:Double) = data.select(min($"lat"),min($"long")).head

def cellUdf(minValue:Double, res:Double) = udf((x:Double) => ((x-minValue)/res).toInt)
val latCoordsUdf = cellUdf(minLat, resolution)
val longCoordsUdf = cellUdf(minLong, resolution)

val relData = data.withColumn("cellx",latCoordsUdf($"lat")).withColumn("celly", longCoordsUdf($"long"))

relData.show(10)

+----------+--------------+-----------+-----+-----+
|      date|           lat|       long|cellx|celly|
+----------+--------------+-----------+-----+-----+
|10/23/2015|-73.1111112212|       45.2|    4|    0|
|10/23/2015|-73.1555555121|45.20005011|    0|    0|
|10/23/2015|-73.1112232113|45.20000051|    4|    0|
|10/20/2015|-73.1121243113|45.20100011|    4|    0|
|10/20/2015|-73.1234123412|45.20004011|    3|    0|
|10/23/2015|-73.1521233123|45.20000211|    0|    0|
|10/23/2015|-73.1531231233|45.20000011|    0|    0|
|10/23/2015|-73.1114423304|45.21100003|    4|    1|
|10/23/2015|-73.1443144233|45.22130002|    1|    2|
|10/23/2015|-73.1283500011|45.21900001|    2|    1|
+----------+--------------+-----------+-----+-----+
Comments