Как создать условную задачу в Airflow

Я хотел бы создать условную задачу в Airflow, как описано в схеме ниже. Ожидаемый сценарий следующий:

  • Задача 1 выполняется
  • Если Задача 1 выполнена успешно, выполнить Задачу 2a
  • В противном случае, если задача 1 не выполнена, выполните задачу 2b.
  • Наконец, выполните задачу 3

Условное задание Все вышеперечисленные задачи являются SSHExecuteOperator. Я предполагаю, что мне следует использовать ShortCircuitOperator и / или XCom для управления условием, но я не совсем понимаю, как это реализовать. Не могли бы вы описать решение?

3 ответа

Решение

Вы должны использовать правила запуска воздушного потока

Все операторы имеют аргумент trigger_rule, который определяет правило, по которому сгенерированная задача запускается.

Возможности триггерного правила:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

Вот идея, чтобы решить вашу проблему:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)

Airflow имеет BranchPythonOperator, который можно использовать для более прямого выражения зависимости ветвления.

Документы описывают его использование:

BranchPythonOperator очень похож на PythonOperator за исключением того, что он ожидает python_callable, который возвращает task_id. За возвращаемым идентификатором task_id следуют, а все остальные пути пропускаются. Идентификатор task_id, возвращаемый функцией Python, должен ссылаться на задачу, находящуюся ниже по потоку от задачи BranchPythonOperator.

...

Если вы хотите пропустить некоторые задачи, имейте в виду, что у вас не может быть пустого пути, если это так, создайте фиктивную задачу.

Позвольте мне добавить мой взгляд на это.

Прежде всего, извините за длинный пост, но я хотел поделиться полным решением, которое работает для меня.

фон

У нас есть скрипт, который извлекает данные из очень дрянного и медленного API. Это медленно, поэтому нам нужно быть избирательными в отношении того, что мы делаем и что мы не извлекаем из него (1 запрос/с с более чем 750 000 запросов). Иногда требования меняются, что заставляет нас извлекать данные полностью, но только для одна/несколько конечных точек. Поэтому нам нужно что-то, что мы можем контролировать.

Строгое ограничение скорости в 1 запрос/с с задержкой в ​​несколько секунд в случае нарушения приведет к остановке всех параллельных задач.

Значение'catchup': Trueпо сути является обратной засыпкой, которая транслируется в параметр командной строки (-c).

Между нашими задачами нет зависимостей данных, нам нужно только следовать порядку (некоторых) задач.

решение

Введение вызываемого объекта pre_execute с дополнительной конфигурацией DAG заботится о правильном пропуске задач, которые вызываютAirflowSkipException.

Во-вторых, на основе конфигурации мы можем заменить исходный оператор на простой оператор Python с тем же именем с простым определением. Таким образом, пользовательский интерфейс не будет запутан, а история триггеров будет сохранена полной, показывая выполнение, когда задача была пропущена.

      from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator

from plugins.airflow_utils import default_args, kubernetes_pod_task


# callable for pre_execute arg
def skip_if_specified(context):
    task_id = context['task'].task_id
    conf = context['dag_run'].conf or {}
    skip_tasks = conf.get('skip_task', [])
    if task_id in skip_tasks:
        raise AirflowSkipException()

# these are necessary to make this solution work
support_task_skip_args = {'pre_execute': skip_if_specified,
                          'trigger_rule': 'all_done'}
extended_args = {**default_args, **support_task_skip_args}

dag_name = 'optional_task_skip'

dag = DAG(dag_name,
          max_active_runs=3,
          schedule_interval=None,
          catchup=False,
          default_args=extended_args)

# select endpoints and modes
# !! make sure the dict items are in the same order as the order you want them to run !!
task_options = {
    'option_name_1':
        {'param': 'fetch-users', 'enabled': True, 'catchup': False},
    'option_name_2':
        {'param': 'fetch-jobs', 'enabled': True},
    'option_name_3':
        {'param': 'fetch-schedules', 'enabled': True, 'catchup': True},
    'option_name_4':
        {'param': 'fetch-messages', 'enabled': True, 'catchup': False},
    'option_name_5':
        {'param': 'fetch-holidays', 'enabled': True, 'catchup': False},
}


def add_tasks():
    task_list_ = []
    for task_name_, task_config_ in task_options.items():
        if task_config_['enabled']:
            parameter_ = task_config_['param']
            catchup_ = '-c ' if task_config_.get('catchup') else ''
            task_list_.append(
                kubernetes_pod_task(
                    dag=dag,
                    command=f"cd people_data; python3 get_people_data.py {parameter_} {catchup_}",
                    task_id=f"{task_name_}"))
            if len(task_list_) > 1:
                task_list_[-2] >> task_list_[-1]
        else:
            # the callable that throws the skip signal
            def skip_task(): raise AirflowSkipException()

            task_list_.append(
                PythonOperator(dag=dag,
                               python_callable=skip_task,
                               task_id=f"{task_name_}",
                               )
            )
            if len(task_list_) > 1:
                task_list_[-2] >> task_list_[-1]


# populate the DAG
add_tasks()

Обратите вниманиеdefault_args, kubernetes_pod_taskпросто обертки для удобства. Задача kubernetespod внедряет некоторые переменные и секреты в простую функцию и используетfrom airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperatorмодуль, я не буду и не могу поделиться ими с вами.

Решение расширяет великие идеи этого джентльмена:
https://www.youtube.com/watch?v=abLGyapcbw0

Хотя это решение работает и с операторами Kubernetes.

Конечно, это можно было бы улучшить, и вы абсолютно точно можете расширить или переработать код для разбора конфигурации ручного триггера (как показано на видео).

Вот как это выглядит в моем пользовательском интерфейсе:

(это не отражает приведенный выше пример конфигурации, а скорее фактические запуски в нашей промежуточной инфраструктуре)

Другие вопросы по тегам