Функция pod_mutation_hook не работает с воздушным потоком, работающим в кубернетах с использованием KubernetesExecutor
Я пытаюсь перенести развертывание воздушного потока в кубернетах из CeleryExecutor
к KubernetesExecutor
. В моей локальной среде разработки (работающей на minikube) все прошло гладко, однако мне нужно загрузить контейнер sidecar в производстве, чтобы запустить прокси, который позволяет мне подключаться к моей базе данных sql. После некоторого поиска в Google выяснилось, что определение функции pod_mutation_hook вairflow_local_settings.py
файл где-нибудь на $PYTHONPATH
вот как это нужно делать.
Сначала я попытался определить это на карте конфигурации для этого примера. например
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-config
namespace: dev
data:
...
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"
AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
...
airflow_local_settings.py: |
from airflow.contrib.kubernetes.pod import Pod
def pod_mutation_hook(pod: Pod):
extra_labels = {
"test-label": "True",
}
pod.labels.update(extra_labels)
Я указал эту конфигурационную карту в airflow.cfg
файл, и он отлично подбирается и монтируется, все остальные переменные env работают правильно, но pod_mutation_hook
не запускается, так как никакие метки не добавляются к результирующему модулю, запущенному исполнителем kubernetes (обратите внимание, что здесь также указано требование тома журналов, и оно работает правильно).
Затем я попытался определить airflow_local_settings.py
файл на изображении, который запускает воздушный поток для задания под $AIRFLOW_HOME/configs/airflow_local_settings.py
как предлагается в комментарии здесь. Я также удалил соответствующие разделы изairflow-config
configmap выше. Это также не повлияло на результирующий модуль, созданный для задания, поскольку он также не имел указанных меток.
Итак, я не уверен, как действовать на этом этапе, потому что я не понимаю, как я должен указывать airflow_local_settings.py
файл и pod_mutation_hook
функционируют таким образом, что они фактически изменяют модуль перед запуском. Любая помощь будет принята с благодарностью. Спасибо.
3 ответа
Резюме:
Вы должны положить свой airflow_local_settings.py
файл на PYTHONPATH
для Планировщика, по крайней мере, если вы хотите, чтобы боковые вагоны на всех подах, запущенных KubernetesExecutor или KubernetesPodOperator (с другим Executor), поскольку POD для них обоих запускались планировщиком.
Однако, если вам также нужны коляски на POD, запущенные KubernetesPodOperator
когда используешь KubernetesExecutor
вам нужно будет установить airflow_local_settings_configmap
в airflow.cfg
(как это сделано на https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/configmap.yaml), как при использовании KubernetePodOperator с KubernetesExecutor (с помощью KubernetesExecutor) рабочий POD.
Обратите внимание, как мы также передаем ту же конфигурационную карту для развертывания планировщика (https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/scheduler/scheduler-deployment.yaml)airflow.cfg
сам тоже, так как мы хотим, чтобы все POD мутировали через pod_mutation_hook.
Детали:
В планировщике должны присутствовать файлы "airflow.cfg" и "airflow_local_settings.py" (здесь не имеет значения, находится ли ваш планировщик на виртуальной машине или на POD). Мы также добавили документацию о том, где разместить этот файл: https://airflow.apache.org/docs/stable/concepts.html
В pod_mutation_hook
сейчас используется всякий раз, когда вы используете KubernetesExecutor
или KubernetePodOperator
. POD, запущенные KubernetesExecutor или KubernetePodOperator, будут использовать этот перехватчик мутации.
Теперь вернемся к configmap. Случай, когда вы используетеKubernetesExecutor
и у вас есть задача, использующая KuberneretPodOperator, вам нужны оба airflow.cfg
а также airflow_local_settings.py
файл, который будет существовать в рабочих модулях, запущенных KubernetesExecutor.
KubernetesExecutor запускает Worker Pod для этой задачи.
Модуль планировщика ---> Worker Pod (Pod_1 - запускается KubernetesExecuetor) -> (Pod_2 - запускается Pod_1 задачей с помощью KubernetePodOperator)
Теперь весь раздел [kubernetes] в airflow.cfg ( https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg) используется только для KubernetesExecutor и влияет на то, что монтируется на рабочих модулях, запущенных KubernetesExecutor.
Если вы не укажете airflow_local_settings
configmap, файл airflow_local_settings не будет подключен к рабочему модулю (Pod_1 в приведенном выше примере), и будет подключен только файл airflow.cfg. Итак, теперь для Pod_2 (запущенного Pod_1) - (особый случай, когда вы используете KubernetesPodOperator с KubernetesExecutor), поскольку Pod_1 (рабочий POD) не имеетairflow_local_settings.py
файл, даже если он есть в Планировщике, Pod_2 не будет изменен, поскольку файл там не существует.
Считайте это то же самое, что и airflow.cfg - зачем вы монтируете airflow.cfg
файл как в Scheduler POD, так и в Work POD. Аналогично для этого крайнего случая вам понадобитсяairflow_local_settings.py
файл в обоих местах.
https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/kubernetes/worker_configuration.py -> Этот код используется для определения того, что установлено на Worker Pod (REF_1)
https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/executors/kubernetes_executor.py -> Pod, созданный для каждой задачи, запускаемой KubernetesExecutor (мутация, применяется к POD is_2) - ROD поскольку он запускается планировщиком и имеетairflow_local_settings.py
файл
https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py -> Этот код используется для создания нового оператора Kubernetes. -- Посколькуairflow_local_settings.py
не был установлен на POD, созданный в REF_2, мутации не были применены к этому POD.
Вы устанавливаете "airflow_local_settings_configmap = airflow-configmap" в поле airflow.cfg?
У меня была такая же проблема, убедитесь, что airflow_local_settings
можно импортировать из планировщика. Вам нужно будет запечь эти изменения в изображениях.
WORKDIR ${AIRFLOW_USER_HOME}
ENV PYTHONPATH $PYTHONPATH:$AIRFLOW_HOME/config/
COPY airflow_local_settings.py $AIRFLOW_HOME/config/airflow_local_settings.py
Используя конфигурационную карту, которую вы выделили выше, вы получите их в исполнители, но в этот момент это не нужно, так что это своего рода бесполезная настройка. Не стесняйтесь читать исходный код:
https://github.com/apache/airflow/blob/8465d66f05baeb73dd4479b019515c069444616e/airflow/settings.py