Semi Semi - 12 days ago 5
Python Question

Evaluate all combinations of rows from two dataframes

I have two large spark DataFrames, both contain coordinates. Let's call them locations and sites:

loc = [('01', 0.2, 0.9), ('02', 0.3, 0.6), ('03', 0.8, 0.1)]
locations = sqlContext.createDataFrame(loc, schema=['id', 'X', 'Y'])

site = [('A', 0.7, 0.1), ('B', 0.3, 0.7), ('C', 0.9, 0.3), ('D', 0.3, 0.8)]
sites = sqlContext.createDataFrame(site, schema=['name', 'X', 'Y'])


locations:

+---+---+---+
| id| X| Y|
+---+---+---+
| 01|0.2|0.9|
| 02|0.3|0.6|
| 03|0.8|0.1|
+---+---+---+


sites:

+----+---+---+
|name| X| X|
+----+---+---+
| A|0.7|0.1|
| B|0.3|0.7|
| C|0.9|0.3|
| D|0.3|0.8|
+----+---+---+


Now I want to calculate the locations which are closest to sites in an efficient way. So that I get something like:

+----+---+
|name| id|
+----+---+
| A| 03|
| B| 02|
| C| 03|
| D| 01|
+----+---+


I was thinking to first make one large dataframe with all information and then use map/reduce to get the location id's closest to all sites. However, I have no idea if that would be the right approach or how I would go about doing this with spark. At the moment I use this:

closest_locations = []
for s in sites.rdd.collect():
min_dist = float('inf')
min_loc = None
for l in locations.rdd.collect():
dist = (l.X - s.X)**2 + (l.Y - s.Y)**2
if dist < min_dist:
min_dist = dist
min_loc = l.id
closest_locations.append((s.name, min_loc))

selected_locations = sqlContext.createDataFrame(closest_locations, schema=['name', 'id'])


But I would like a more spark-like approach, because the above is obviously very slow. How can I evaluate all combinations of rows of two spark dataframes efficiently?

Answer

You can:

from pyspark.sql.functions import udf, struct
from pyspark.sql import DoubleType


dist = udf(lamdba x1, y1, x2, y2: (x1 - x2)**2 + (y1 - y1)**2, DoubleType())

locations.join(sites).withColumn("dist", dist(
    locations.X, locations.Y, sites.X, sites.Y)).select(
  "name", struct("id", "dist")
).rdd.reduceByKey(lambda x, y: min(x, y, key=lambda x: x[1]))