Могу ли я использовать функции, импортированные из файлов.py в Dask/Distributed?

У меня есть вопрос о сериализации и импорте.

  • должны ли функции иметь свой собственный импорт? как я видел, сделал с PySpark
  • Это просто неправильно? Есть ли mod.py должен быть пакет conda/pip? mod.py был записан в общую файловую систему.

In [1]: from distributed import Executor

In [2]: e = Executor('127.0.0.1:8786')

In [3]: e
Out[3]: <Executor: scheduler="127.0.0.1:8786" processes=2 cores=2>

In [4]: import socket

In [5]: e.run(socket.gethostname)
Out[5]: {'172.20.12.7:53405': 'n1015', '172.20.12.8:53779': 'n1016'}

In [6]: %%file mod.py
   ...: def hostname():
   ...:     return 'the hostname'
   ...: 
Overwriting mod.py

In [7]: import mod

In [8]: mod.hostname()
Out[8]: 'the hostname'

In [9]: e.run(mod.hostname)
distributed.utils - ERROR - No module named 'mod'

3 ответа

Решение

Быстрый ответ

Загрузите файл mod.py всем своим работникам. Вы можете сделать это, используя любой механизм, который вы использовали для настройки dask.distributed, или вы можете использовать метод upload_file

e.upload_file('mod.py')

В качестве альтернативы, если ваша функция выполнена в IPython, а не является частью модуля, она будет отправлена ​​без проблем.

Длинный ответ

Все это связано с тем, как функции сериализуются в Python. Функции из модулей сериализуются по имени модуля и имени функции

In [1]: from math import sin

In [2]: import pickle

In [3]: pickle.dumps(sin)
Out[3]: b'\x80\x03cmath\nsin\nq\x00.'

Так что, если клиентский компьютер хочет обратиться к math.sin функция, которую он отправляет вдоль этой строки (который вы заметите, имеет 'math' а также 'sin' в нем похоронен среди других байтов) рабочий аппарат. Работник смотрит на эту строку тестирования и говорит: "Хорошо, отлично, функция, которую я хочу, находится в том-то и таком-то модуле, позвольте мне пойти и найти это в моей локальной файловой системе. Если модуль отсутствует, то это вызовет ошибку, очень похоже на то, что вы получили выше.

Для динамически создаваемых функций (функций, которые вы создаете в IPython) он использует совершенно другой подход, объединяя весь код. Этот подход в целом работает нормально.

Вообще говоря, Dask предполагает, что все работники и клиент имеют одинаковую программную среду. Обычно это делается в основном тем, кто настраивает ваш кластер, используя какой-либо другой инструмент, например Docker. Методы как upload_file есть ли заполнить пробелы, когда у вас есть файлы или сценарии, которые обновляются чаще.

Чтобы запустить импортированную функцию в вашем кластере, которая недоступна в рабочей среде, вы также можете создать локальную функцию из импортированной функции. Эта локальная функция будет затем cloudpickle, В Python 2 вы можете достичь этого с new.function (см. новый модуль). Для Python 3 это может быть достигнуто с помощью модуля типов, но я не пробовал.

Ваш пример выше будет выглядеть так:

In [3]: import mod

In [4]: import new

In [5]: def remote(func):
   ...:     return new.function(func.func_code, func.func_globals, closure=func.func_closure)
   ...:

In [6]: e.run(remote(mod.hostname))
Out[6]: {'tcp://10.0.2.15:44208': 'the hostname'}

У меня работало добавление каталога модуля в PYTHONPATH

Другие вопросы по тегам