Планировщик воздушного потока не планирует (или медленно), когда много задач

Я работаю с воздушным потоком в Google Cloud Composer (версия: composer-1.10.2-airflow-1.10.6).

Я понял, что планировщик не планирует задачу, когда нужно обработать много задач (см. Представление Ганта ниже)

(не обращайте внимания на цвета, красные задачи - это "операторы createTable", которые терпят неудачу, если таблица уже существует, поэтому они должны терпеть неудачу 5 раз перед запуском следующей части (важной) группы DAG)

Между задачами бывают перерывы в часах! (например, 5 часов с 10 до 15 и ничего не произошло)

Обычно он отлично работает с ~40 DAG, по 100-200 задач каждый (иногда немного больше). Но недавно я добавил 2 группы DAG с большим количеством задач (~5000 каждая), и планировщик очень медленный или не планирует задачи. На скриншоте я приостановил 2 группы DAG с большим количеством задач в 15 часов вечера, и планировщик снова вернулся, выполняя свою работу нормально.

Есть ли у вас какое-нибудь решение?

Airflow - это инструмент, решающий "бесконечное" количество задач.

Вот некоторая информация о моей среде:

  • версия: composer-1.10.2-airflow-1.10.6
  • размер кластера: 6 (12 ЦП, 96 ГБ памяти)

Вот некоторая информация о конфигурации воздушного потока:

╔════════════════════════════════╦═══════╗
║ Airflow parameter              ║ value ║
╠════════════════════════════════╬═══════╣
║ -(celery)-                     ║       ║
║ worker_concurrency             ║ 32    ║
║ -(webserver)-                  ║       ║
║ default_dag_run_display_number ║ 2     ║
║ workers                        ║ 2     ║
║ worker_refresh_interval        ║ 60    ║
║ -(core)-                       ║       ║
║ max_active_runs_per_dag        ║ 1     ║
║ dagbag_import_timeout          ║ 600   ║
║ parallelism                    ║ 200   ║
║ min_file_process_interval      ║ 60    ║
║ -(scheduler)-                  ║       ║
║ processor_poll_interval        ║ 5     ║
║ max_threads                    ║ 2     ║
╚════════════════════════════════╩═══════╝

Спасибо за помощь

РЕДАКТИРОВАТЬ:

26 моих групп DAG создаются одним файлом.py путем анализа огромной переменной JSON для создания всех групп DAG и задач.

Возможно, проблема в этом, потому что сегодня Airflow планирует задачи из других групп DAG, чем 26 (особенно 2 больших DAG), которые я описал. Точнее, Airflow иногда планирует задачи 26 моих групп DAG, но гораздо легче и чаще планирует задачи других групп DAG.

2 ответа

Высокая задержка между задачами обычно является индикатором наличия узкого места, связанного с планировщиком (в отличие от чего-то, связанного с работником). Даже при многократном запуске одних и тех же групп DAG среда Composer все еще может страдать от подобных узких мест в производительности, потому что работа может распределяться каждый раз по-разному или могут быть разные процессы, выполняющиеся в фоновом режиме.

Для начала я бы порекомендовал увеличить количество потоков, доступных планировщику (scheduler.max_threads), а затем убедитесь, что ваш планировщик не использует весь ЦП узла, на котором он находится. Вы можете проверить показатели ЦП для узла, на котором находится планировщик, определив, где он находится, а затем проверив в Cloud Console. Чтобы найти имя узла:

# Obtain the Composer namespace name
kubectl get namespaces | grep composer

# Check for the scheduler
kubectl get pods -n $NAMESPACE -o wide | grep scheduler

Если вышеперечисленное не помогает, то также возможно, что планировщик намеренно блокирует какое-либо условие. Чтобы проверить все условия, которые оцениваются, когда планировщик проверяет выполнение задач, установитеcore.logging_level=DEBUG. В журналах планировщика (которые вы можете фильтровать в Cloud Logging) вы можете затем проверить все условия, которые прошли или не прошли, чтобы задача была запущена или оставалась в очереди.

Я считаю, что вам следует перейти на Composer версии 1.10.4, наличие последних исправлений всегда помогает.

С какой базой данных вы работаете? Иметь все эти неудавшиеся задачи крайне не рекомендуется. Вы можете использоватьCREATE TABLE IF NOT EXISTS ...?

Другие вопросы по тегам