Как включить динамические требования в Luigi?
Я построил трубопровод задач в Луиджи. Поскольку этот конвейер будет использоваться в разных контекстах, вполне возможно, что ему потребуется включить больше задач в начале или конце конвейера или даже совершенно разные зависимости между задачами.
Тогда я подумал: "Эй, зачем объявлять зависимости между задачами в моем конфигурационном файле?", Поэтому я добавил что-то подобное в мой config.py:
PIPELINE_DEPENDENCIES = {
"TaskA": [],
"TaskB": ["TaskA"],
"TaskC": ["TaskA"],
"TaskD": ["TaskB", "TaskC"]
}
Меня раздражало наличие этих параметров в задачах, поэтому в какой-то момент я ввел только один параметр: task_config
, что каждый Task
есть и где каждая информация или данные, необходимые для run()
хранится. Итак, я положил PIPELINE_DEPENDENCIES
прямо там.
Наконец-то я бы Task
Я определил наследовать от обоих luigi.Task
и пользовательский класс Mixin, который будет реализовывать динамический requires()
, который выглядит примерно так:
class TaskRequirementsFromConfigMixin(object):
task_config = luigi.DictParameter()
def requires(self):
required_tasks = self.task_config["PIPELINE_DEPENDENCIES"]
requirements = [
self._get_task_cls_from_str(required_task)(task_config=self.task_config)
for required_task in required_tasks
]
return requirements
def _get_task_cls_from_str(self, cls_str):
...
К сожалению, это не работает, так как запуск конвейера дает мне следующее:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 were left pending, among these:
* 4 was not granted run permission by the scheduler:
- 1 TaskA(...)
- 1 TaskB(...)
- 1 TaskC(...)
- 1 TaskD(...)
Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler
===== Luigi Execution Summary =====
и много
DEBUG: Not all parameter values are hashable so instance isn't coming from the cache
Хотя я не уверен, что это актуально.
Итак: 1. В чем моя ошибка? Это поправимо? 2. Есть ли другой способ добиться этого?
1 ответ
Я понимаю, что это старый вопрос, но недавно я узнал, как включить динамические зависимости. Я смог добиться этого, используя WrapperTask и получив понимание dict (хотя вы тоже можете сделать список, если хотите) с параметрами, которые я хотел передать другим задачам в методе requires.
Что-то вроде этого:
class WrapperTaskToPopulateParameters(luigi.WrapperTask):
date = luigi.DateMinuteParameter(interval=30, default=datetime.datetime.today())
def requires(self):
base_params = ['string', 'string', 'string', 'string', 'string', 'string']
modded_params = {modded_param:'mod' + base for base in base_params}
yield list(SomeTask(param1=key_in_dict_we_created, param2=value_in_dict_we_created) for key_in_dict_we_created,value_in_dict_we_created in modded_params.items())
Я могу опубликовать пример, используя понимание списка, если есть интерес.