Функция pod_mutation_hook не работает с воздушным потоком, работающим в кубернетах с использованием KubernetesExecutor

Я пытаюсь перенести развертывание воздушного потока в кубернетах из CeleryExecutor к KubernetesExecutor. В моей локальной среде разработки (работающей на minikube) все прошло гладко, однако мне нужно загрузить контейнер sidecar в производстве, чтобы запустить прокси, который позволяет мне подключаться к моей базе данных sql. После некоторого поиска в Google выяснилось, что определение функции pod_mutation_hook вairflow_local_settings.py файл где-нибудь на $PYTHONPATH вот как это нужно делать.

Сначала я попытался определить это на карте конфигурации для этого примера. например

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: dev
data:
  ...

  AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"

  AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
  ...

  airflow_local_settings.py: |
    from airflow.contrib.kubernetes.pod import Pod

    def pod_mutation_hook(pod: Pod):
        extra_labels = {
            "test-label": "True",
        }
        pod.labels.update(extra_labels)

Я указал эту конфигурационную карту в airflow.cfg файл, и он отлично подбирается и монтируется, все остальные переменные env работают правильно, но pod_mutation_hook не запускается, так как никакие метки не добавляются к результирующему модулю, запущенному исполнителем kubernetes (обратите внимание, что здесь также указано требование тома журналов, и оно работает правильно).

Затем я попытался определить airflow_local_settings.py файл на изображении, который запускает воздушный поток для задания под $AIRFLOW_HOME/configs/airflow_local_settings.pyкак предлагается в комментарии здесь. Я также удалил соответствующие разделы изairflow-configconfigmap выше. Это также не повлияло на результирующий модуль, созданный для задания, поскольку он также не имел указанных меток.

Итак, я не уверен, как действовать на этом этапе, потому что я не понимаю, как я должен указывать airflow_local_settings.py файл и pod_mutation_hookфункционируют таким образом, что они фактически изменяют модуль перед запуском. Любая помощь будет принята с благодарностью. Спасибо.

3 ответа

Резюме:

Вы должны положить свой airflow_local_settings.py файл на PYTHONPATH для Планировщика, по крайней мере, если вы хотите, чтобы боковые вагоны на всех подах, запущенных KubernetesExecutor или KubernetesPodOperator (с другим Executor), поскольку POD для них обоих запускались планировщиком.

Однако, если вам также нужны коляски на POD, запущенные KubernetesPodOperator когда используешь KubernetesExecutor вам нужно будет установить airflow_local_settings_configmap в airflow.cfg(как это сделано на https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/configmap.yaml), как при использовании KubernetePodOperator с KubernetesExecutor (с помощью KubernetesExecutor) рабочий POD.

Обратите внимание, как мы также передаем ту же конфигурационную карту для развертывания планировщика (https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/scheduler/scheduler-deployment.yaml)airflow.cfg сам тоже, так как мы хотим, чтобы все POD мутировали через pod_mutation_hook.

Детали:

В планировщике должны присутствовать файлы "airflow.cfg" и "airflow_local_settings.py" (здесь не имеет значения, находится ли ваш планировщик на виртуальной машине или на POD). Мы также добавили документацию о том, где разместить этот файл: https://airflow.apache.org/docs/stable/concepts.html

В pod_mutation_hook сейчас используется всякий раз, когда вы используете KubernetesExecutor или KubernetePodOperator. POD, запущенные KubernetesExecutor или KubernetePodOperator, будут использовать этот перехватчик мутации.

Теперь вернемся к configmap. Случай, когда вы используетеKubernetesExecutor и у вас есть задача, использующая KuberneretPodOperator, вам нужны оба airflow.cfg а также airflow_local_settings.py файл, который будет существовать в рабочих модулях, запущенных KubernetesExecutor.

KubernetesExecutor запускает Worker Pod для этой задачи.

Модуль планировщика ---> Worker Pod (Pod_1 - запускается KubernetesExecuetor) -> (Pod_2 - запускается Pod_1 задачей с помощью KubernetePodOperator)

Теперь весь раздел [kubernetes] в airflow.cfg ( https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg) используется только для KubernetesExecutor и влияет на то, что монтируется на рабочих модулях, запущенных KubernetesExecutor.

Если вы не укажете airflow_local_settingsconfigmap, файл airflow_local_settings не будет подключен к рабочему модулю (Pod_1 в приведенном выше примере), и будет подключен только файл airflow.cfg. Итак, теперь для Pod_2 (запущенного Pod_1) - (особый случай, когда вы используете KubernetesPodOperator с KubernetesExecutor), поскольку Pod_1 (рабочий POD) не имеетairflow_local_settings.py файл, даже если он есть в Планировщике, Pod_2 не будет изменен, поскольку файл там не существует.

Считайте это то же самое, что и airflow.cfg - зачем вы монтируете airflow.cfgфайл как в Scheduler POD, так и в Work POD. Аналогично для этого крайнего случая вам понадобитсяairflow_local_settings.py файл в обоих местах.

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/kubernetes/worker_configuration.py -> Этот код используется для определения того, что установлено на Worker Pod (REF_1)

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/executors/kubernetes_executor.py -> Pod, созданный для каждой задачи, запускаемой KubernetesExecutor (мутация, применяется к POD is_2) - ROD поскольку он запускается планировщиком и имеетairflow_local_settings.py файл

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py -> Этот код используется для создания нового оператора Kubernetes. -- Посколькуairflow_local_settings.pyне был установлен на POD, созданный в REF_2, мутации не были применены к этому POD.

Вы устанавливаете "airflow_local_settings_configmap = airflow-configmap" в поле airflow.cfg?

У меня была такая же проблема, убедитесь, что airflow_local_settingsможно импортировать из планировщика. Вам нужно будет запечь эти изменения в изображениях.

WORKDIR ${AIRFLOW_USER_HOME}
ENV PYTHONPATH  $PYTHONPATH:$AIRFLOW_HOME/config/
COPY airflow_local_settings.py $AIRFLOW_HOME/config/airflow_local_settings.py

Используя конфигурационную карту, которую вы выделили выше, вы получите их в исполнители, но в этот момент это не нужно, так что это своего рода бесполезная настройка. Не стесняйтесь читать исходный код:

https://github.com/apache/airflow/blob/8465d66f05baeb73dd4479b019515c069444616e/airflow/settings.py

Другие вопросы по тегам