johnbaltis johnbaltis - 1 month ago 11
Python Question

dask, execute non-serializable object on every worker

I am trying to execute the following graph:

graph

which is generated by the following code:

energies = [10, 20]
system = delayed(make_non_serializable_oject)(x=1)
trans = [delayed(some_function_that_uses_system)(system, energy) for energy in energies]
result = delayed(list)(trans)
result.visualize()


When I call
result.compute()
the calculation never finishes.

Calling
result.compute(get=dask.async.get_sync)
, and
result.compute(dask.threaded.get)
both do work. However
result.compute(dask.multiprocessing.get)
does not and generates the following error:

---------------------------------------------------------------------------
RemoteError Traceback (most recent call last)
<ipython-input-70-b5c8f2a1c6f6> in <module>()
----> 1 result.compute(get=dask.multiprocessing.get)

/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
76 Extra keywords to forward to the scheduler ``get`` function.
77 """
---> 78 return compute(self, **kwargs)[0]
79
80 @classmethod

/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
169 dsk = merge(var.dask for var in variables)
170 keys = [var._keys() for var in variables]
--> 171 results = get(dsk, keys, **kwargs)
172
173 results_iter = iter(results)

/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
81 # Run
82 result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 83 queue=queue, get_id=_process_get_id, **kwargs)
84 finally:
85 if cleanup:

/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
479 _execute_task(task, data) # Re-execute locally
480 else:
--> 481 raise(remote_exception(res, tb))
482 state['cache'][key] = res
483 finish_task(dsk, key, state, results, keyorder.get)

RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/managers.py", line 228, in serve_client
request = recv()
File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
File "kwant/graph/core.pyx", line 664, in kwant.graph.core.CGraph_malloc.__cinit__ (kwant/graph/core.c:8330)
TypeError: __cinit__() takes exactly 6 positional arguments (0 given)
---------------------------------------------------------------------------

Traceback
---------
File "/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/async.py", line 273, in execute_task
queue.put(result)
File "<string>", line 2, in put
File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/managers.py", line 732, in _callmethod
raise convert_to_error(kind, result)


With
ipyparallel
I would execute the
make_non_serializable_oject
on each engine, which solves the problem for that case.

I would like to use
dask
for my parallel computations, how can I solve this?

Answer

Ensure that your data can be serialized

This code in your traceback shows that the objects from your kwant library are not serializing themselves well:

Traceback (most recent call last):
  File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/managers.py", line 228, in serve_client
    request = recv()
  File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "kwant/graph/core.pyx", line 664, in kwant.graph.core.CGraph_malloc.__cinit__ (kwant/graph/core.c:8330)
TypeError: __cinit__() takes exactly 6 positional arguments (0 given)

Which is why the multiprocessing and distributed schedulers are failing. Dask requires the ability to serialize data in order to move it around between different processes.

The simplest and cleanest way to solve this problem is to improve serialization of your data. Ideally you can do this by improving kwant. You could also manage this through dask's custom serialization, but that' possibly more work at the moment.

Keep data in one location

OK, so lets assume that you can't improve serialization and need to keep data where it is. This will restrict you to embarrassingly parallel workflows (map). There are two solutions:

  1. Use the fuse optimization`
  2. Track explicitly where tasks run

Fuse

You're going to create some unserializable data, then run stuff on it, then run a computation on it that turns it into something serializable before trying to move it back. This is fine as long as the scheduler decides to never move the data around on its own. You can enforce this by fusing all of those tasks into a single atomic task. See the optimization docs for details

from dask.optimize import fuse
bad_data = [f(...) for ...]
good_data = [convert_to_serializable_data(bd) for bd in bad_data]
dask.compute(good_data, optimizations=[fuse])

Specify exactly where each computation should live yourself

See Data Locality docs

Comments