Воздушный поток - правильный способ обработки обратных вызовов DAG

У меня есть DAG и затем всякий раз, когда это успешно или неудачно, я хочу, чтобы он вызывал метод, который отправляет сообщения в Slack.

мой DAG args как показано ниже:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

И DAG само определение:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

Но когда я проверяю Slack, каждую минуту появляется более 100 сообщений, как будто выполняется оценка каждого такта планировщика и для каждого журнала он запускает метод успеха и сбоя, как если бы он работал и не работал для одного и того же экземпляра задачи (не отлично).

Как правильно использовать on_failure_callback а также on_success_callback обрабатывать статусы Дагса и вызывать пользовательский метод?

2 ответа

Причина, по которой он создает сообщения, заключается в том, что когда вы определяете default_args, вы выполняете функции. Вам нужно просто передать определение функции, не выполняя ее.

Так как у функции есть аргумент, она станет немного хитрее. Вы можете определить две частичные функции или две функции-оболочки.

Так что вы можете сделать:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

или же

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

В любом методе обратите внимание, как просто определение функции failure_msg а также success_msg передаются, а не результат, который они дают при исполнении.

Default _args расширяется на уровне задачи, поэтому становится обратным вызовом для каждой задачи

применить атрибут на уровне флага DAG за пределами default_args

Что slack метод вы имеете в виду? Планировщик анализирует ваш файл DAG каждый такт, поэтому если slack некоторая функция, определенная в вашем коде, она будет запускаться при каждом ударе.

Несколько вещей, которые вы можете попробовать:

  • Определите функции, которые вы хотите вызывать как PythonOperators, а затем вызывайте их на уровне задач, а не на уровне DAG.

  • Вы также можете использовать TriggerRules для задания задач ниже по течению от вашей задачи ETL, которые будут запускаться в зависимости от сбоя или успеха родительской задачи.

Из документов: defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}

Вы можете найти пример того, как это будет выглядеть здесь (полное раскрытие - я автор).

Другие вопросы по тегам