Воздушный поток - правильный способ обработки обратных вызовов DAG
У меня есть DAG
и затем всякий раз, когда это успешно или неудачно, я хочу, чтобы он вызывал метод, который отправляет сообщения в Slack.
мой DAG args
как показано ниже:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
И DAG
само определение:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
Но когда я проверяю Slack, каждую минуту появляется более 100 сообщений, как будто выполняется оценка каждого такта планировщика и для каждого журнала он запускает метод успеха и сбоя, как если бы он работал и не работал для одного и того же экземпляра задачи (не отлично).
Как правильно использовать on_failure_callback
а также on_success_callback
обрабатывать статусы Дагса и вызывать пользовательский метод?
2 ответа
Причина, по которой он создает сообщения, заключается в том, что когда вы определяете default_args
, вы выполняете функции. Вам нужно просто передать определение функции, не выполняя ее.
Так как у функции есть аргумент, она станет немного хитрее. Вы можете определить две частичные функции или две функции-оболочки.
Так что вы можете сделать:
from functools import partial
success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
или же
def success_msg():
slack.slack_message(happy_message);
def failure_msg():
slack.slack_message(sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
В любом методе обратите внимание, как просто определение функции failure_msg
а также success_msg
передаются, а не результат, который они дают при исполнении.
Default _args расширяется на уровне задачи, поэтому становится обратным вызовом для каждой задачи
применить атрибут на уровне флага DAG за пределами default_args
Что slack
метод вы имеете в виду? Планировщик анализирует ваш файл DAG каждый такт, поэтому если slack
некоторая функция, определенная в вашем коде, она будет запускаться при каждом ударе.
Несколько вещей, которые вы можете попробовать:
Определите функции, которые вы хотите вызывать как PythonOperators, а затем вызывайте их на уровне задач, а не на уровне DAG.
Вы также можете использовать TriggerRules для задания задач ниже по течению от вашей задачи ETL, которые будут запускаться в зависимости от сбоя или успеха родительской задачи.
Из документов:
defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}
Вы можете найти пример того, как это будет выглядеть здесь (полное раскрытие - я автор).