Brian Bruggeman Brian Bruggeman - 2 months ago 16
JSON Question

How to merge multiple JSON data rows based on a field in pyspark with a given reduce function

How do I merge the JSON data rows as shown below using the merge function below with pyspark?

Note: Assume this is just a minutia example and I have 1000s of rows of data to merge. What is the most performant solution? For better or for worse, I must use pyspark.


data = [
{'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'},
{'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'},
{'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'},
... More ...

Desired Output:

combined_result = [
{'name': 'Joe Schmoe': {'addresses': [('20080411204445', '100 Sunder Ct'), ('20040218165319', '100 Lee Ave')]}},
{'name': 'John Doe': {'addresses': [('20120309173318', '1818 Westminster')]}},
... More ...

Merge function:

def reduce_on_name(a, b):
'''Combines two JSON data rows based on name'''
merged = {}
if a['name'] == b['name']:
addresses = (a['timestamp'], a['address']), (b['timestamp'], b['address'])
merged['name'] = a['name']
merged['addresses'] = addresses
return merged


I think it would be something like this:

sc.parallelize(data).groupBy(lambda x: x['name']).map(lambda t: {'name':t[0],'addresses':[(x['timestamp'], x['address']) for x in t[1]]}).collect()