Передача объектов Python между задачами в Luigi?
Я писал свой первый проект на Python 3.6, используя Luigi от Spotify для организации некоторых задач обработки естественного языка в конвейере.
Я заметил, что output()
функция Task
класс всегда возвращает какой-то Target
объект, который представляет собой какой-то файл, будь то локальный или удаленный. Поскольку мои задачи создают более сложные структуры данных, такие как деревья разбора, мне довольно неудобно записывать их в файлы в виде строк и после них читать их снова.
Поэтому я хотел бы спросить, есть ли возможность передавать объекты Python между задачами в конвейере?
1 ответ
Краткий ответ: Нет.
Параметры Luigi ограничены объектами date/datetime, string, int и float. Смотрите документы для справки.
Это означает, что вам необходимо сериализовать сложную структуру данных в виде строки (используя json, msgpack, любой сериализатор, который вам нравится, и даже сжать ее) и передать ее в качестве строкового параметра.
Конечно, вы можете написать собственный подкласс Parameter, но вам нужно в основном реализовать методы serialize и parse.
Но примите во внимание: если вы будете использовать параметры вместо сохранения вычисленных данных в целевом объекте, вы потеряете одно ключевое преимущество использования Luigi: если родительская задача в дереве завершится с ошибкой больше, чем количество попыток, которые вы указали, то вы Вам нужно будет запустить задачу, которая снова вычисляет эту сложную структуру данных. Если ваши задачи вычисляют сложные данные или занимают значительное количество времени или потребляют много ресурсов, то вы должны сохранить выходные данные как цель, чтобы не делать все эти дорогостоящие вычисления снова.
И не только: эти данные могут понадобиться и другим задачам, так почему бы не сохранить их?
Также обратите внимание, что цели - это не только файлы: вы можете сохранить свои данные в таблице базы данных, Redis, Hadoop, индексе Elastic Search и многом другом: http://luigi.readthedocs.io/en/stable/api/luigi.contrib.html
Есть и другой - все еще немного хакерский - способ достижения того, что вы пытаетесь сделать с целью вместо параметров.
В MockFile есть специальная цель luigi.mock
что позволяет хранить его "файл" в памяти.
Его API похож на другие классы наследования Target, поэтому вам придется open
, read
а также write
к этому. Вдруг это только поддерживает string
вход, так что вам все еще нужно сериализовать ваш объект (это связано с отправкой этих данных через канал между процессами). Смотрите следующий пример (сериализация yaml):
import yaml
from luigi import Task
class TaskA(Task):
def output(self):
return MockFile('whatever')
def run(self):
object_to_send = yaml.dump({"example": "dict"})
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
class TaskB(Task):
def requires(self):
return TaskA()
def run(self):
_in = self.input().read('r')
serialised = _in.read()
deserialised = yaml.load(serialised)
print(deserialised)
Имейте в виду, что сериализация больших объектов может занять много времени.