Airflow 2.0.1: переопределение шаблона пода не работает должным образом для KubernetesExecutor
Настройка: Airflow 2.0.1 с Kubernetes 1.18 и Python 3.8, клиент Kubernetes: 18.17.x
Файл шаблона пакета:
apiVersion: v1
kind: Pod
metadata:
name: workerPod
spec:
containers:
- args: []
command: []
env:
- name: <Key>
value: "<value>"
envFrom: []
name: base
image: "<image_name>"
imagePullSecrets: [name: "<image_pull_secrets>"]
imagePullPolicy: "Always"
ports: []
volumeMounts:
- mountPath: "<path>"
name: "<name>"
Конфигурация по умолчанию, установленная в airflow.cfg, выглядит следующим образом:
[kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>
Проблема в том, что, хотя некоторые ключи правильно считываются, например, из pod_template_file, я могу видеть все
env variables
быть установлен правильно, а также
imagePullPolicy
также правильно читается (подтверждается переопределением значения
imagePullPolicy: "Always"
из
imagePullPolicy: "IfNotPresent"
), но ключ для не читается правильно. Я могу подтвердить это, так как я получаю
Base credentials not provided
ошибка при извлечении изображения из репозитория ecr. Я подтвердил, что учетные данные верны, и я могу создать модуль, пытаясь сделать это явно.
Даже при попытке установить
imagePullSecrets
в
airflow.cfg
напрямую, я все равно получаю ту же ошибку.
Я также попытался создать переопределение модуля, используя API V1 явно следующим образом:
start_task = PythonOperator(
task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
executor_config={
"pod_template_file": "<path_to_template>",
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="<image_override>",
image_pull_policy="<pull_policy>"
),
],
image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
)
),
},
)
В этом случае я могу правильно загрузить образ докера без каких-либо ошибок аутентификации. Но, к сожалению, модуль выдает ошибку:
AttributeError: 'V1Container' object has no attribute '_startup_probe'
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
self._execute()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
if not self.task_instance.check_and_change_state_before_execution(
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
session.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
self.transaction.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
self._flush(objects)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
flush_context.execute()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
persistence.save_obj(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
for (
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
state.manager[propkey].impl.is_equal(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
return x == y
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
return self.to_dict() == other.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
result[attr] = value.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
result[attr] = list(map(
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
value = getattr(self, attr)
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'
2 ответа
У меня была аналогичная проблема. Проблема заключалась в том, что мы изменили наши контейнеры воздушного потока и обновили библиотеку Kubernetes в новых контейнерах. Не обязательно проблема с новой библиотекой Kubernetes, но Airflow сериализовал некоторые объекты (в нашем случае TaskInstance, по-видимому, также имеет место в вашем случае в соответствии с общей обратной трассировкой), и он десериализует их и создает из них объект Python. Итак, в вашем случае он воссоздает
V1Container
объект из сериализованной формы, которую он имел. Новый объект в вашем случае структурирован в Python таким образом , что в его инициализаторе установлен атрибут _startup_probe. Но сериализованная версия не имеет этого атрибута, поэтому кажется, что это версия до этой фиксации. Кажется, что десериализация не вызывает проблем, но всякий раз, когда используется метод to_dict, возникают проблемы. В вашем случае он используется для сравнения (eq), для меня это было при регистрации, поскольку repr использует его.
Сообщество Airflow Slack указало мне на это изменение , которое должно решить эту проблему. Я еще не смог это проверить, но уже делюсь этим здесь на случай, если кто-то попадет в него.
У меня была похожая проблема, которая начала возникать, когда мы обновили нашу версию воздушного потока. проблема заключалась в том, что мы устанавливали более старую версию kubernetes, которая была несовместима с последним воздушным потоком, при создании пользовательского контейнера kubernetes (в файле докеров). Использование последней версии kubernetes в dockerfile устранило проблему.