adebesin adebesin - 4 months ago 29
Python Question

How to update a PySpark RDD of dictionaries based on some condition

Firstly I understand the concept of persistent data structures and immutability with regards to RDD's.. update is the only word I could think of :)

My question is:

Given an RDD of dictionaries (or Row objects) how can I loop/map across and apply some transformation login on that RDD and receive back a new RDD with those transformations applied. Example:

Given an RDD containing dictionaries:

fbb = sc.parallelize(
[{'amount_gbp': -43.33,
'balance_gbp': 57.08,
'type': 'GED',
'id': 961690979,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0},
{'amount_gbp': 17.08,
'balance_gbp': 40.0,
'type': 'OIP',
'id': 962182953,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0}])


I attempted to apply the function:

def update_virtual_cash_balance(x):
x.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']}) if x['type'] == 'GED' else x

fbb.map(lambda x: update_virtual_cash_balance(x)).collect()


And expected:

[{'amount_gbp': -43.33,
'balance_gbp': 57.08,
'type': 'GED',
'id': 961690979,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 13.75,
'virtual_debt_balance': 0},
{'amount_gbp': 17.08,
'balance_gbp': 40.0,
'type': 'OIP',
'id': 962182953,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0}]


But got:

Out[411]: [None, None]


Any help with what I am misunderstanding would be great.

Answer
  • update_virtual_cash_balance doesn't return anything so you get None
  • update method doesn't return anything so you would get None even if update_virtual_cash_balance returned value
  • you shouldn't modify data in place. RDD is immutable and mutating data can have undesired effects.

Try:

def update_virtual_cash_balance(x):
    if x['type'] == 'GED':
        z = x.copy()  # shallow copy should be enough here
        z.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']}
        return z
    return  x