Как включить динамические требования в Luigi?

Я построил трубопровод задач в Луиджи. Поскольку этот конвейер будет использоваться в разных контекстах, вполне возможно, что ему потребуется включить больше задач в начале или конце конвейера или даже совершенно разные зависимости между задачами.

Тогда я подумал: "Эй, зачем объявлять зависимости между задачами в моем конфигурационном файле?", Поэтому я добавил что-то подобное в мой config.py:

PIPELINE_DEPENDENCIES = {
     "TaskA": [],
     "TaskB": ["TaskA"],
     "TaskC": ["TaskA"],
     "TaskD": ["TaskB", "TaskC"]
}

Меня раздражало наличие этих параметров в задачах, поэтому в какой-то момент я ввел только один параметр: task_config, что каждый Task есть и где каждая информация или данные, необходимые для run() хранится. Итак, я положил PIPELINE_DEPENDENCIES прямо там.

Наконец-то я бы Task Я определил наследовать от обоих luigi.Task и пользовательский класс Mixin, который будет реализовывать динамический requires(), который выглядит примерно так:

class TaskRequirementsFromConfigMixin(object):
    task_config = luigi.DictParameter()

    def requires(self):
        required_tasks = self.task_config["PIPELINE_DEPENDENCIES"]
        requirements = [
            self._get_task_cls_from_str(required_task)(task_config=self.task_config)
            for required_task in required_tasks
        ]
        return requirements

    def _get_task_cls_from_str(self, cls_str):
        ...

К сожалению, это не работает, так как запуск конвейера дает мне следующее:

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 were left pending, among these:
    * 4 was not granted run permission by the scheduler:
        - 1 TaskA(...)
        - 1 TaskB(...)
        - 1 TaskC(...)
        - 1 TaskD(...)

Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler

===== Luigi Execution Summary =====

и много

DEBUG: Not all parameter values are hashable so instance isn't coming from the cache

Хотя я не уверен, что это актуально.

Итак: 1. В чем моя ошибка? Это поправимо? 2. Есть ли другой способ добиться этого?

1 ответ

Я понимаю, что это старый вопрос, но недавно я узнал, как включить динамические зависимости. Я смог добиться этого, используя WrapperTask и получив понимание dict (хотя вы тоже можете сделать список, если хотите) с параметрами, которые я хотел передать другим задачам в методе requires.

Что-то вроде этого:

      class WrapperTaskToPopulateParameters(luigi.WrapperTask):
    date = luigi.DateMinuteParameter(interval=30, default=datetime.datetime.today())

    def requires(self):
    base_params = ['string', 'string', 'string', 'string', 'string', 'string']
    modded_params = {modded_param:'mod' + base for base in base_params}
    yield list(SomeTask(param1=key_in_dict_we_created, param2=value_in_dict_we_created) for key_in_dict_we_created,value_in_dict_we_created in modded_params.items())

Я могу опубликовать пример, используя понимание списка, если есть интерес.

Другие вопросы по тегам