Сбой воздушного потока
Как я могу настроить Airflow так, чтобы любой сбой в группе обеспечения доступности баз данных (немедленно) приводил к появлению слабого сообщения?
На данный момент я управляю этим путем создания slack_failed_task:
slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
И установите эту задачу (one_failed) в восходящем направлении друг от друга в DAG:
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
Это работает, но это подвержено ошибкам, так как забыв добавить задачу, пропускает слабые уведомления и кажется большой работой.
Есть ли способ расширить email_on_failure
недвижимость в DAG?
Бонус;-) за включение способа передачи имени невыполненной задачи в сообщение.
6 ответов
Может быть, этот пример будет полезен:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
Попробуйте новый SlackWebhookOperator, который есть в версии Airflow>=1.10.0
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg="Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
Примечание: убедитесь, что у вас есть slack_connection
добавлены в ваши соединения Airflow как
host=https://hooks.slack.com/services/
Как я могу настроить Airflow так, чтобы любой сбой в DAG (немедленно) приводил к появлению сообщения о резерве?
С использованием
airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook
вы можете добиться этого, передав
on_failure_callback
функционировать на уровне DAG.
Бонус ;-) за включение в сообщение способа передать имя неудавшейся задачи.
def fail():
raise Exception("Task failed intentionally for testing purpose")
def success():
print("success")
def task_fail_slack_alert(context):
tis_dagrun = context['ti'].get_dagrun().get_task_instances()
failed_tasks = []
for ti in tis_dagrun:
if ti.state == State.FAILED:
# Adding log url
failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
dag=context.get('task_instance').dag_id
exec_date=context.get('execution_date')
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":red_circle: Dag Failed."
}
},
{
"type": "section",
"block_id": f"section{uuid.uuid4()}",
"text": {
"type": "mrkdwn",
"text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
},
"accessory": {
"type": "image",
"image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
"alt_text": "Airflow"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Failed Tasks: {', '.join(failed_tasks)}"
}
}
]
failed_alert = SlackWebhookHook(
http_conn_id='slack-airflow',
channel="#airflow-notifications",
blocks=blocks,
username='airflow'
)
failed_alert.execute()
return
default_args = {
'owner': 'airflow'
}
with DAG(
dag_id="slack-test",
default_args=default_args,
start_date=datetime(2021,8,19),
schedule_interval=None,
on_failure_callback=task_fail_slack_alert
) as dag:
task_1 = PythonOperator(
task_id="slack_notification_test",
python_callable=fail
)
task_2 = PythonOperator(
task_id="slack_notification_test2",
python_callable=success
)
BaseOperator поддерживает параметр on_failure_callback:
on_failure_callback (callable) - функция, вызываемая при сбое экземпляра задачи. контекстный словарь передается в качестве отдельного параметра этой функции. Контекст содержит ссылки на связанные объекты с экземпляром задачи и задокументирован в разделе макросов API.
Я не проверял это, но вы должны быть в состоянии определить функцию, которая отправляет сообщения, чтобы провисать при неудаче и передавать ее в каждое определение задачи. Чтобы получить имя текущей задачи, вы можете использовать шаблон {{ task_id }}.
Я бы предпочел добавить обратный вызов в DAG и быть унаследованным всеми его задачами:
def on_failure_callback(context):
webhook_url = os.getenv('SLACK_WEBHOOK_TOKEN')
slack_data = {
'text': "@here DAG {} Failed".format(context['dag'].dag_id)
}
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
dag = DAG(
dag_id='dag_with_templated_dir',
start_date=datetime(2020, 1, 1),
on_failure_callback=on_failure_callback
)
Вы можете отправить электронное письмо на слабый канал . Таким образом, вам не потребуется никакой дополнительной настройки, и вы сможете предоставить массив электронных писем: один для реальной настройки электронной почты, а другой для слабого канала.
'email': ["realemail@gmail.com", "slackchannelname@company.slack.com"]