prabhu - 9 months ago 58

Scala Question

I have a csv file that contains a million rows of dateTime, longitude, lattitude values. As of now, I have extracted the minimum lattitude and longitude values. My aim is to form cells of size 0.01*0.01 on this dataset. I would like to do the following in scala.

create a 2d array: grid[rows][cols]

`for r <- 0 until rows`

for c <- 0 until cols

if( (tuple(longitude) <= minimumlongitude + (0.01 * r) AND tuple(longitude) <= minimumlongitude + (0.01 * r+1) ) AND ((tuple(lattitude) <= minimumlattitude + (0.01 * r) AND tuple(lattitude) <= minimumlattitude + (0.01 * r+1) ))

then

grid[r][c].append(tuple)

Basically, determine the point (x,y) belongs to which cell in the grid and group all those points to represent the particular cell.

`(10/23/2015, -73.1111112212, 45.2)`

(10/23/2015, -73.1555555121, 45.20005011)

(10/23/2015, -73.1112232113, 45.20000051)

(10/20/2015, -73.1121243113, 45.20100011)

(10/20/2015, -73.1234123412, 45.20004011)

(10/23/2015, -73.1521233123, 45.20000211)

(10/23/2015, -73.1531231233, 45.20000011)

... upto about 10 million rows.

what i have done is, i have extracted the min longitude and minimum lattitude and max longitude and max lattitude. so, this forms the outer big rectangle. Now, I want to divide this rectangle into 0.01*0.01 sized cells. for example, the first cell will be (minlattitude, minlongitude), (minlattitude + 0.01, minlongitude+0.01). then, I would like to map each row of data to which cell it belongs to based on the condition that

rowOfData.longitude >= cell.minLongitude && rowOfData.longitude < cell.minLongitude+0.01 &&

rowOfData.lattidude >= cell.minLattitude && rowOfData.lattidude < cell.minLattitude+0.01

Could some one please tell me how to go about it? And also, more efficiently, due to the size of the data set. Any help is greatly appreciated

Answer Source

Group a dataset of coordinates in cells of a given resolution, starting at the minimal coordinates of the bounding box for the set:

```
val resolution = 0.01
val sampleData = "/.../sampleGeoCoordinatesWithTs.csv"
val data = sparkSession.read.option("inferSchema", "true").csv(sampleData).toDF("date","lat","long")
import org.apache.spark.sql.Row
val Row(minLat:Double, minLong:Double) = data.select(min($"lat"),min($"long")).head
def cellUdf(minValue:Double, res:Double) = udf((x:Double) => ((x-minValue)/res).toInt)
val latCoordsUdf = cellUdf(minLat, resolution)
val longCoordsUdf = cellUdf(minLong, resolution)
val relData = data.withColumn("cellx",latCoordsUdf($"lat")).withColumn("celly", longCoordsUdf($"long"))
relData.show(10)
+----------+--------------+-----------+-----+-----+
| date| lat| long|cellx|celly|
+----------+--------------+-----------+-----+-----+
|10/23/2015|-73.1111112212| 45.2| 4| 0|
|10/23/2015|-73.1555555121|45.20005011| 0| 0|
|10/23/2015|-73.1112232113|45.20000051| 4| 0|
|10/20/2015|-73.1121243113|45.20100011| 4| 0|
|10/20/2015|-73.1234123412|45.20004011| 3| 0|
|10/23/2015|-73.1521233123|45.20000211| 0| 0|
|10/23/2015|-73.1531231233|45.20000011| 0| 0|
|10/23/2015|-73.1114423304|45.21100003| 4| 1|
|10/23/2015|-73.1443144233|45.22130002| 1| 2|
|10/23/2015|-73.1283500011|45.21900001| 2| 1|
+----------+--------------+-----------+-----+-----+
```