sim sim - 3 months ago 19
Python Question

Locking in dask.multiprocessing.get and adding metadata to HDF

Performing an ETL-task in pure Python, I would like to collect error metrics as well as metadata for each of the raw input files considered (error metrics are computed from error codes provided in the data section of the files while metadata is stored in headers). Here's pseudo-code for the whole procedure:

import pandas as pd
import dask
from dask import delayed
from dask import dataframe as dd

META_DATA = {} # shared resource
ERRORS = {} # shared resource

def read_file(file_name):
global META_DATA, ERRORS

# step 1: process headers
headers = read_header(file_name)
errors = {}
data_bfr = []

# step 2: process data section
for line in data_section:
content_id, data = parse_line(line)
if contains_errors(data):
errors[content_id] = get_error_code(data)
else:
data_bfr.append(content_id, data)

# ---- Part relevant for question 1 ----
# step 3: acquire lock for shared resource and write metadata
with lock.acquire():
write_metadata(file_name, headers) # stores metadata in META_DATA[file_name]
write_errors(file_name, errors) # stores error metrics in ERRORS[file_name]

return pd.DataFrame(data=data_bfr,...)

with set_options(get=dask.multiprocessing.get):
df = dd.from_delayed([delayed(read_file)(file_name) \
for file_name in os.listdir(wd)])

# ---- Part relevant for question 2 ----
df.to_hdf('data.hdf', '/data', 'w', complevel=9, \
complib='blosc',..., metadata=(META_DATA, ERRORS))


For each input file
read_file
returns a
pd.DataFrame
, further writing relevant metadata and error metrics to shared resources. I am using
dask
's multiprocessing scheduler to compute a
dask.dataframe
from a list of delayed
read_file
-operations.


  • Question 1: Each of the
    read_file
    -operations writes to the shared resources
    META_DATA
    and
    ERRORS
    . What do I have to do to implement a proper locking strategy that works with
    dask.multiprocessing.get
    ? Would it be sufficient to write the metadata and error information to the collections from within a
    with locket.lock_file('.lock'):
    -context? Does
    multiprocessing.RLock
    work? What do I have to do to initialize the lock to work with
    dask
    ? More fundamentally, how can I declare
    META_DATA
    and
    ERRORS
    as shared resources in
    dask
    ?

  • Question 2: If possible, I would like to annotate HDF-data with metadata and error metrics. From a question on "Collecting attributes from dask dataframe providers", I learned that
    dask
    currently does not support adding metadata to dataframes, but is it possible for the information be written to HDF? If so, how to handle the access to the shared resources in this case?


Answer

Don't depend on Globals

Dask works best with pure functions.

In particular, your case is a limitation in Python, in that it (correctly) doesn't share global data between processes. Instead, I recommend that you return data explicitly from functions:

def read_file(file_name):
    ...
    return df, metadata, errors

values = [delayed(read_file)(fn) for fn in filenames]
dfs, metadata, errors = zip(*values)

df = dd.from_delayed(dfs)

import toolz
metadata = delayed(toolz.merge)(metadata)
errors = delayed(toolz.merge)(errors)