Сбой воздушного потока

Как я могу настроить Airflow так, чтобы любой сбой в группе обеспечения доступности баз данных (немедленно) приводил к появлению слабого сообщения?

На данный момент я управляю этим путем создания slack_failed_task:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)

И установите эту задачу (one_failed) в восходящем направлении друг от друга в DAG:

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e

Это работает, но это подвержено ошибкам, так как забыв добавить задачу, пропускает слабые уведомления и кажется большой работой.

Есть ли способ расширить email_on_failure недвижимость в DAG?

Бонус;-) за включение способа передачи имени невыполненной задачи в сообщение.

6 ответов

Решение

Может быть, этот пример будет полезен:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)

Попробуйте новый SlackWebhookOperator, который есть в версии Airflow>=1.10.0

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg="Hi Wssup?"

slack_test =  SlackWebhookOperator(
    task_id='slack_test',
    http_conn_id='slack_connection',
    webhook_token='/1234/abcd',
    message=slack_msg,
    channel='#airflow_updates',
    username='airflow_'+os.environ['ENVIRONMENT'],
    icon_emoji=None,
    link_names=False,
    dag=dag)

Примечание: убедитесь, что у вас есть slack_connection добавлены в ваши соединения Airflow как

host=https://hooks.slack.com/services/

Как я могу настроить Airflow так, чтобы любой сбой в DAG (немедленно) приводил к появлению сообщения о резерве?

С использованием airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook вы можете добиться этого, передав on_failure_callback функционировать на уровне DAG.

Бонус ;-) за включение в сообщение способа передать имя неудавшейся задачи.

      
def fail():
    raise Exception("Task failed intentionally for testing purpose")

def success():
    print("success")

def task_fail_slack_alert(context):
    tis_dagrun = context['ti'].get_dagrun().get_task_instances()
    failed_tasks = []
    for ti in tis_dagrun:
        if ti.state == State.FAILED:
            # Adding log url
            failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
    
    dag=context.get('task_instance').dag_id
    exec_date=context.get('execution_date')

    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": ":red_circle: Dag Failed."
            }
        },
        {
            "type": "section",
            "block_id": f"section{uuid.uuid4()}",
            "text": {
                "type": "mrkdwn",
                "text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
            },
            "accessory": {
                "type": "image",
                "image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
                "alt_text": "Airflow"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"Failed Tasks: {', '.join(failed_tasks)}"
            }
        }
    ]
    failed_alert = SlackWebhookHook(
        http_conn_id='slack-airflow',
        channel="#airflow-notifications",    
        blocks=blocks,
        username='airflow'
    )
    failed_alert.execute()
    return 

default_args = {
    'owner': 'airflow'
}
with DAG(
    dag_id="slack-test",
    default_args=default_args,
    start_date=datetime(2021,8,19),
    schedule_interval=None,
    on_failure_callback=task_fail_slack_alert
) as dag:

    task_1 = PythonOperator(
        task_id="slack_notification_test",
        python_callable=fail
    )

    task_2 = PythonOperator(
        task_id="slack_notification_test2",
        python_callable=success
    )

BaseOperator поддерживает параметр on_failure_callback:

on_failure_callback (callable) - функция, вызываемая при сбое экземпляра задачи. контекстный словарь передается в качестве отдельного параметра этой функции. Контекст содержит ссылки на связанные объекты с экземпляром задачи и задокументирован в разделе макросов API.

Я не проверял это, но вы должны быть в состоянии определить функцию, которая отправляет сообщения, чтобы провисать при неудаче и передавать ее в каждое определение задачи. Чтобы получить имя текущей задачи, вы можете использовать шаблон {{ task_id }}.

Я бы предпочел добавить обратный вызов в DAG и быть унаследованным всеми его задачами:

def on_failure_callback(context):
    webhook_url = os.getenv('SLACK_WEBHOOK_TOKEN')
    slack_data = {
        'text': "@here DAG {} Failed".format(context['dag'].dag_id)
    }

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

dag = DAG(
    dag_id='dag_with_templated_dir',
    start_date=datetime(2020, 1, 1),
    on_failure_callback=on_failure_callback
)

Вы можете отправить электронное письмо на слабый канал . Таким образом, вам не потребуется никакой дополнительной настройки, и вы сможете предоставить массив электронных писем: один для реальной настройки электронной почты, а другой для слабого канала.

      'email': ["realemail@gmail.com", "slackchannelname@company.slack.com"]
Другие вопросы по тегам