Как создать условную задачу в Airflow
Я хотел бы создать условную задачу в Airflow, как описано в схеме ниже. Ожидаемый сценарий следующий:
- Задача 1 выполняется
- Если Задача 1 выполнена успешно, выполнить Задачу 2a
- В противном случае, если задача 1 не выполнена, выполните задачу 2b.
- Наконец, выполните задачу 3
Все вышеперечисленные задачи являются SSHExecuteOperator. Я предполагаю, что мне следует использовать ShortCircuitOperator и / или XCom для управления условием, но я не совсем понимаю, как это реализовать. Не могли бы вы описать решение?
3 ответа
Вы должны использовать правила запуска воздушного потока
Все операторы имеют аргумент trigger_rule, который определяет правило, по которому сгенерированная задача запускается.
Возможности триггерного правила:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
Вот идея, чтобы решить вашу проблему:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
Airflow имеет BranchPythonOperator, который можно использовать для более прямого выражения зависимости ветвления.
Документы описывают его использование:
BranchPythonOperator очень похож на PythonOperator за исключением того, что он ожидает python_callable, который возвращает task_id. За возвращаемым идентификатором task_id следуют, а все остальные пути пропускаются. Идентификатор task_id, возвращаемый функцией Python, должен ссылаться на задачу, находящуюся ниже по потоку от задачи BranchPythonOperator.
...
Если вы хотите пропустить некоторые задачи, имейте в виду, что у вас не может быть пустого пути, если это так, создайте фиктивную задачу.
Позвольте мне добавить мой взгляд на это.
Прежде всего, извините за длинный пост, но я хотел поделиться полным решением, которое работает для меня.
фон
У нас есть скрипт, который извлекает данные из очень дрянного и медленного API. Это медленно, поэтому нам нужно быть избирательными в отношении того, что мы делаем и что мы не извлекаем из него (1 запрос/с с более чем 750 000 запросов). Иногда требования меняются, что заставляет нас извлекать данные полностью, но только для одна/несколько конечных точек. Поэтому нам нужно что-то, что мы можем контролировать.
Строгое ограничение скорости в 1 запрос/с с задержкой в несколько секунд в случае нарушения приведет к остановке всех параллельных задач.
Значение'catchup': True
по сути является обратной засыпкой, которая транслируется в параметр командной строки (-c
).
Между нашими задачами нет зависимостей данных, нам нужно только следовать порядку (некоторых) задач.
решение
Введение вызываемого объекта pre_execute с дополнительной конфигурацией DAG заботится о правильном пропуске задач, которые вызываютAirflowSkipException
.
Во-вторых, на основе конфигурации мы можем заменить исходный оператор на простой оператор Python с тем же именем с простым определением. Таким образом, пользовательский интерфейс не будет запутан, а история триггеров будет сохранена полной, показывая выполнение, когда задача была пропущена.
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from plugins.airflow_utils import default_args, kubernetes_pod_task
# callable for pre_execute arg
def skip_if_specified(context):
task_id = context['task'].task_id
conf = context['dag_run'].conf or {}
skip_tasks = conf.get('skip_task', [])
if task_id in skip_tasks:
raise AirflowSkipException()
# these are necessary to make this solution work
support_task_skip_args = {'pre_execute': skip_if_specified,
'trigger_rule': 'all_done'}
extended_args = {**default_args, **support_task_skip_args}
dag_name = 'optional_task_skip'
dag = DAG(dag_name,
max_active_runs=3,
schedule_interval=None,
catchup=False,
default_args=extended_args)
# select endpoints and modes
# !! make sure the dict items are in the same order as the order you want them to run !!
task_options = {
'option_name_1':
{'param': 'fetch-users', 'enabled': True, 'catchup': False},
'option_name_2':
{'param': 'fetch-jobs', 'enabled': True},
'option_name_3':
{'param': 'fetch-schedules', 'enabled': True, 'catchup': True},
'option_name_4':
{'param': 'fetch-messages', 'enabled': True, 'catchup': False},
'option_name_5':
{'param': 'fetch-holidays', 'enabled': True, 'catchup': False},
}
def add_tasks():
task_list_ = []
for task_name_, task_config_ in task_options.items():
if task_config_['enabled']:
parameter_ = task_config_['param']
catchup_ = '-c ' if task_config_.get('catchup') else ''
task_list_.append(
kubernetes_pod_task(
dag=dag,
command=f"cd people_data; python3 get_people_data.py {parameter_} {catchup_}",
task_id=f"{task_name_}"))
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]
else:
# the callable that throws the skip signal
def skip_task(): raise AirflowSkipException()
task_list_.append(
PythonOperator(dag=dag,
python_callable=skip_task,
task_id=f"{task_name_}",
)
)
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]
# populate the DAG
add_tasks()
Обратите вниманиеdefault_args, kubernetes_pod_task
просто обертки для удобства. Задача kubernetespod внедряет некоторые переменные и секреты в простую функцию и используетfrom airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
модуль, я не буду и не могу поделиться ими с вами.
Решение расширяет великие идеи этого джентльмена:
https://www.youtube.com/watch?v=abLGyapcbw0
Хотя это решение работает и с операторами Kubernetes.
Конечно, это можно было бы улучшить, и вы абсолютно точно можете расширить или переработать код для разбора конфигурации ручного триггера (как показано на видео).
Вот как это выглядит в моем пользовательском интерфейсе:
(это не отражает приведенный выше пример конфигурации, а скорее фактические запуски в нашей промежуточной инфраструктуре)