Что произойдет с датчиком воздушного потока, работающим на работнике Celery, если сам рабочий выйдет из строя
Пример использования: в DAG определена задача Rest API (с использованием RestOperator), которая обращается к API приложения и запускает выполнение процесса / задачи, которая выполняет некоторую бизнес-функцию. Статус выполнения отслеживается с помощью датчика воздушного потока, который опрашивает статус завершения выполнения задачи через вызов API.
Вопрос:
- Если узел Celery выйдет из строя, что произойдет с Sensor, который работал на этом узле.
- если Sensor умирает вместе с worker, как передать выполнение датчика на другой узел (чтобы избежать потери функциональности).
from datetime import datetime, timedelta
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http import SimpleHttpOperator
default_args = {
'start_date': datetime.today().strftime('%Y-%m-%d'),
'end_date': None
}
dag = DAG(
'Rest Monitor',
default_args=default_args,
schedule_interval=None,
catchup=False)
HttpOperator = SimpleHttpOperator(
task_id='RestOperator',
method='POST',
endpoint='https://localhost:8080/api/task/execute',
headers={
"Content-Type": "application/json"
},
data={
"taskId": "1234",
}
},
dag = dag )
def resp_check():
return "True if Status = Success"
HttpSensor = HttpSensor(
dag = dag,
task_id = 'http_sensor_head_method',
http_conn_id = 'http_default',
endpoint = 'https://localhost:8080/api/task/1234/status',
request_params = {},
method = 'HEAD',
response_check = resp_check,
timeout = 5,
poke_interval = 1)
HttpOperator >> HttpSensor
1 ответ
Эта логика якобы управляется ядром Airflow и его координацией рабочих через Scheduler
. Как правило, планировщик опрашивает или отправляет данные о состоянии / журнале задачи с рабочего узла через регулярные промежутки времени.
По сути, Планировщик явно управляет данными о работнике и о Задаче, которой он должен управлять. Если рабочий перестает отвечать на запросы или возвращается недостаточно данных о задаче. Планировщик начнет выполнение задачи выключения и не выполнит логику попытки.
Если будет больше повторных попыток для задачи, то новая копия задачи будет добавлена в очередь, чтобы все работающие рабочие в вашем кластере могли ее забрать.