Как безопасно перезапустить Airflow и убить давно выполняющуюся задачу?

У меня Airflow работает в Kubernetes с помощью CeleryExecutor. Airflow отправляет и отслеживает задания Spark с помощью DatabricksOperator.

Мои потоковые задания Spark имеют очень долгое время выполнения (они выполняются бесконечно, если они не завершатся сбоем или не будут отменены). Когда поды для Airflow worker уничтожаются во время выполнения потокового задания, происходит следующее:

  1. Связанная задача становится зомби (состояние выполняется, но нет процесса с пульсом)
  2. Задача помечена как неудачная, когда Airflow пожинает зомби
  3. Задание потоковой передачи Spark продолжает выполняться

Как я могу заставить воркера убить мою работу Spark до того, как она завершится?

Я попытался убить рабочего Celery с помощью сигнала TERM, но, видимо, это заставляет Celery перестать принимать новые задачи и ждать завершения текущих задач (документы).

1 ответ

Вам нужно более четко сформулировать проблему. Если вы говорите, что искровый кластер завершает задания, как ожидалось, и не вызывает функцию on_kill, это ожидаемое поведение. Согласно документации, функция kill предназначена для очистки после того, как задача была убита.

def on_kill(self) -> None:
    """
    Override this method to cleanup subprocesses when a task instance
    gets killed. Any use of the threading, subprocess or multiprocessing
    module within an operator needs to be cleaned up or it will leave
    ghost processes behind.
    """

В вашем случае, когда вы вручную убиваете задание, оно делает то, что должно.

Теперь, если вы хотите иметь clean_up даже после успешного завершения задания, переопределите функцию post_execute. Согласно документам. Выполнение сообщения

def post_execute(self, context: Any, result: Any = None):
    """
    This hook is triggered right after self.execute() is called.
    It is passed the execution context and any results returned by the
    operator.
    """
Другие вопросы по тегам