Custom Dask перемещаемый объект
Я использовал пользовательский словарь, подобный объекту, чтобы легко хранить результаты Dask-графа, но использование получающегося объекта для вычисления Dask-графа не вычисляет его дочерние элементы.
Можно ли изменить пользовательский объект таким образом, чтобы Dask мог обходить и вычислять его дочерние объекты?
Пример:
import dask
import dask.delayed as delayed
from collections import defaultdict
print('Dask version', dask.__version__)
Dictionary1 = {}
Dictionary1['a'] = delayed(sum)([2,3])
print('Native Dict', dask.compute(Dictionary1) )
Dictionary2 = defaultdict(defaultdict)
Dictionary2['a'] = delayed(sum)([2,3])
print('Custom Dict', dask.compute(Dictionary2) )
Полученный результат:
Dask version 0.19.2
Native Dict ({'a': 5},)
Custom Dict (defaultdict(<class 'collections.defaultdict'>, {'a': Delayed('sum-212db0df-1c14-4314-9a56-2eb87ef58abe')}),)
РЕДАКТИРОВАТЬ: Решение на основе ответа MRocklin
import dask
import dask.delayed as delayed
from collections import defaultdict
from dask.base import DaskMethodsMixin
class DefaultDictDict(defaultdict, DaskMethodsMixin):
def __init__(self, *args ): ## Define an infinite nested dict.
return defaultdict.__init__(self, DefaultDictDict, *args)
def __dask_graph__(self):
## NOTE: Errors in this functions are silent, and disable collections interface
## The dask attributes are already a graph with key to itself.
a = dict()
self._keys = []
for x in self.values():
if not hasattr(x,'dask'): ## Use dummy delayed to convert objects to graphs.
x = delayed(lambda data:data)(x)
a.update(x.dask)
self._keys.append(x.key)
return a
def __dask_keys__(self):
return self._keys
__dask_scheduler__ = staticmethod(dask.threaded.get)
def __dask_postcompute__(self):
def Reconstruct(results):
return DefaultDictDict(zip(self.keys(), results))
return Reconstruct, ()
Dictionary3 = DefaultDictDict()
Dictionary3['b']['c'] = delayed(sum)([2,3])
print('Collections Dict', dask.compute(Dictionary3)[0] )
Результат:
Collections Dict defaultdict(<class '__main__.DefaultDictDict'>, {'b': defaultdict(<class '__main__.DefaultDictDict'>, {'c': 5})})
(Это все еще показывает defaultdict
, так как __repr__
не был переопределен должным образом)
1 ответ
В настоящее время Dask проходит только через стандартные базовые коллекции Python (dicts, lists, ...). Это поведение не распространяется на 2018-10-07.
Однако вы можете создавать свои собственные коллекции Dask, которые по сути просто передают граф и ключи их участников. См. http://docs.dask.org/en/latest/custom-collections.html