Как сделать пользовательский объект доступным для функции, переданной dask df.apply (не может сериализоваться)
Весь этот код работает в пандах, но однопоточный запуск выполняется медленно.
У меня есть объект (это фильтр Блума), который медленно создается.
У меня есть dask код, который выглядит примерно так:
def has_match(row, my_filter):
return my_filter.matches(
a=row.a, b =row.b
)
# ....make dask dataframe ddf
ddf['match'] = ddf.apply(has_match, args=(my_filter, ), axis=1, meta=(bool))
ddf.compute()
Когда я пытаюсь запустить это, я получаю сообщение об ошибке:
distributed.protocol.core - CRITICAL - Failed to Serialize
Мой объект был создан из библиотеки C, поэтому я не удивлен, что его нельзя сериализовать автоматически, но я не знаю, как обойти это.
2 ответа
Распределенный ожидает, что все промежуточные результаты будут сериализуемыми. В вашем случае у вас есть объект, который не реализует рассол. В общем, у вас есть несколько вариантов (от лучших к худшим ИМХО):
Реализуйте рассол для этого объекта. Обратите внимание, что с помощью модуля copyreg вы можете добавить поддержку pickle для классов, которые не находятся под вашим контролем.
Кэшируйте создание фильтра в вашей функции вручную. Вы можете сделать это с помощью объекта или глобальной переменной в вашем модуле. Обратите внимание, что приведенный ниже код должен быть частью импортированного модуля, а не частью вашего интерактивного сеанса (то есть не в сеансе jupyter notebook/ipython).
Например (не проверено):
myfilter = None
def get_or_load():
global myfilter
if myfilter is None:
myfilter = load_filter()
else:
return myfilter
def load_filter():
pass
def has_match(row):
my_filter = get_or_load()
return my_filter.matches(a=row.a, b=row.b)
И тогда в вашем коде пользователя:
from my_filter_utils import has_match
ddf['match'] = ddf.apply(has_match, axis=1, meta=('matches', bool))
- Используйте dask для управления кешем. Для этого оберните объект в другой класс, который повторно загружает объект при сериализации. Если вы затем сохраните этот объект в кластере, dask будет удерживать его, и самое большее функция создания будет вызываться один раз на каждом узле.
Например (не проверено):
from dask import delayed
class Wrapper(object):
def __init__(self, func):
self.func = func
self.filter = func()
def __reduce__(self):
# When unpickled, the filter will be reloaded
return (Wrapper, (func,))
def load_filter():
pass
# Create a delayed function to load the filter
wrapper = delayed(Wrapper)(load_filter)
# Optionally persist the wrapper in the cluster, to be reused over multiple computations
wrapper = wrapper.persist()
def has_match(row, wrapper):
return wrapper.filter.matches(a=row.a, b=row.b)
ddf['match'] = ddf.apply(has_match, args=(wrapper,), axis=1, meta=('matches', bool))
Используйте только темы
Один из способов - полностью избежать этой проблемы и просто не использовать отдельные процессы вообще. Таким образом, вам не нужно будет сериализовать данные между ними.
ddf.compute(scheduler='threads')
Это ограничивает вас тем, что вы запускаете один процесс на одной машине, что может не соответствовать вашим ожиданиям.
Узнайте, как сериализовать ваш объект
Если вы можете выяснить, как превратить ваш объект в строку байтов и обратно, тогда вы можете реализовать протокол pickle для вашего объекта (например, __getstate__
а также __setstate__
методы, см. документы по Python) или вы можете добавить определения в диспетчеризируемые функции dask_serialize и dask_deserialize. См. Документацию по сериализации Dask для примера.
Воссоздай свои объекты каждый раз
Может быть, сложно сериализовать ваш объект, но дешево воссоздать его один раз на раздел?
def has_match(partition):
my_filter = make_filter(...)
return partition.apply(my_filter.matches(a=row.a, b =row.b))
ddf['match'] = ddf.map_partitions(has_match)