Что произойдет с датчиком воздушного потока, работающим на работнике Celery, если сам рабочий выйдет из строя

Пример использования: в DAG определена задача Rest API (с использованием RestOperator), которая обращается к API приложения и запускает выполнение процесса / задачи, которая выполняет некоторую бизнес-функцию. Статус выполнения отслеживается с помощью датчика воздушного потока, который опрашивает статус завершения выполнения задачи через вызов API.

Вопрос:

  1. Если узел Celery выйдет из строя, что произойдет с Sensor, который работал на этом узле.
  2. если Sensor умирает вместе с worker, как передать выполнение датчика на другой узел (чтобы избежать потери функциональности).
from datetime import datetime, timedelta
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http import SimpleHttpOperator

default_args = {
    'start_date': datetime.today().strftime('%Y-%m-%d'),
    'end_date': None
}

dag = DAG(
    'Rest Monitor',
    default_args=default_args,
    schedule_interval=None,
    catchup=False)

HttpOperator = SimpleHttpOperator(
    task_id='RestOperator',
    method='POST',
    endpoint='https://localhost:8080/api/task/execute',
    headers={
        "Content-Type": "application/json"
    },
    data={
        "taskId": "1234",
    }
},
dag = dag )

def resp_check():
    return "True if Status = Success"

HttpSensor = HttpSensor(
    dag = dag,
    task_id = 'http_sensor_head_method',
    http_conn_id = 'http_default',
    endpoint = 'https://localhost:8080/api/task/1234/status',
    request_params = {},
    method = 'HEAD',
    response_check = resp_check,
    timeout = 5,
    poke_interval = 1)


HttpOperator >> HttpSensor

1 ответ

Эта логика якобы управляется ядром Airflow и его координацией рабочих через Scheduler. Как правило, планировщик опрашивает или отправляет данные о состоянии / журнале задачи с рабочего узла через регулярные промежутки времени.

По сути, Планировщик явно управляет данными о работнике и о Задаче, которой он должен управлять. Если рабочий перестает отвечать на запросы или возвращается недостаточно данных о задаче. Планировщик начнет выполнение задачи выключения и не выполнит логику попытки.

Если будет больше повторных попыток для задачи, то новая копия задачи будет добавлена ​​в очередь, чтобы все работающие рабочие в вашем кластере могли ее забрать.

Другие вопросы по тегам