prabhu prabhu - 11 months ago 66
Scala Question

Group cartesian co-ordinates to cells in apache spark

I have a csv file that contains a million rows of dateTime, longitude, lattitude values. As of now, I have extracted the minimum lattitude and longitude values. My aim is to form cells of size 0.01*0.01 on this dataset. I would like to do the following in scala.

create a 2d array: grid[rows][cols]

for r <- 0 until rows
for c <- 0 until cols
if( (tuple(longitude) <= minimumlongitude + (0.01 * r) AND tuple(longitude) <= minimumlongitude + (0.01 * r+1) ) AND ((tuple(lattitude) <= minimumlattitude + (0.01 * r) AND tuple(lattitude) <= minimumlattitude + (0.01 * r+1) ))
then
grid[r][c].append(tuple)


Basically, determine the point (x,y) belongs to which cell in the grid and group all those points to represent the particular cell.

EDIT 1: the sample input is as follows:

(10/23/2015, -73.1111112212, 45.2)
(10/23/2015, -73.1555555121, 45.20005011)
(10/23/2015, -73.1112232113, 45.20000051)
(10/20/2015, -73.1121243113, 45.20100011)
(10/20/2015, -73.1234123412, 45.20004011)
(10/23/2015, -73.1521233123, 45.20000211)
(10/23/2015, -73.1531231233, 45.20000011)
... upto about 10 million rows.


what i have done is, i have extracted the min longitude and minimum lattitude and max longitude and max lattitude. so, this forms the outer big rectangle. Now, I want to divide this rectangle into 0.01*0.01 sized cells. for example, the first cell will be (minlattitude, minlongitude), (minlattitude + 0.01, minlongitude+0.01). then, I would like to map each row of data to which cell it belongs to based on the condition that

rowOfData.longitude >= cell.minLongitude && rowOfData.longitude < cell.minLongitude+0.01 &&

rowOfData.lattidude >= cell.minLattitude && rowOfData.lattidude < cell.minLattitude+0.01

Could some one please tell me how to go about it? And also, more efficiently, due to the size of the data set. Any help is greatly appreciated

Answer Source

Group a dataset of coordinates in cells of a given resolution, starting at the minimal coordinates of the bounding box for the set:

val resolution = 0.01
val sampleData = "/.../sampleGeoCoordinatesWithTs.csv"
val data = sparkSession.read.option("inferSchema", "true").csv(sampleData).toDF("date","lat","long")

import  org.apache.spark.sql.Row
val Row(minLat:Double, minLong:Double) = data.select(min($"lat"),min($"long")).head

def cellUdf(minValue:Double, res:Double) = udf((x:Double) => ((x-minValue)/res).toInt)
val latCoordsUdf = cellUdf(minLat, resolution)
val longCoordsUdf = cellUdf(minLong, resolution)

val relData = data.withColumn("cellx",latCoordsUdf($"lat")).withColumn("celly", longCoordsUdf($"long"))

relData.show(10)

+----------+--------------+-----------+-----+-----+
|      date|           lat|       long|cellx|celly|
+----------+--------------+-----------+-----+-----+
|10/23/2015|-73.1111112212|       45.2|    4|    0|
|10/23/2015|-73.1555555121|45.20005011|    0|    0|
|10/23/2015|-73.1112232113|45.20000051|    4|    0|
|10/20/2015|-73.1121243113|45.20100011|    4|    0|
|10/20/2015|-73.1234123412|45.20004011|    3|    0|
|10/23/2015|-73.1521233123|45.20000211|    0|    0|
|10/23/2015|-73.1531231233|45.20000011|    0|    0|
|10/23/2015|-73.1114423304|45.21100003|    4|    1|
|10/23/2015|-73.1443144233|45.22130002|    1|    2|
|10/23/2015|-73.1283500011|45.21900001|    2|    1|
+----------+--------------+-----------+-----+-----+