Airflow 2.0.1: переопределение шаблона пода не работает должным образом для KubernetesExecutor

Настройка: Airflow 2.0.1 с Kubernetes 1.18 и Python 3.8, клиент Kubernetes: 18.17.x

Файл шаблона пакета:

      apiVersion: v1
kind: Pod
metadata:
  name: workerPod

spec:
  containers:
    - args: []
      command: []
      env:
        - name: <Key>
          value: "<value>"
      envFrom: []
      name: base
      image: "<image_name>"
      imagePullSecrets: [name: "<image_pull_secrets>"]
      imagePullPolicy: "Always"
      ports: []
      volumeMounts:
        - mountPath: "<path>"
          name: "<name>"

Конфигурация по умолчанию, установленная в airflow.cfg, выглядит следующим образом:

      [kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>

Проблема в том, что, хотя некоторые ключи правильно считываются, например, из pod_template_file, я могу видеть все env variables быть установлен правильно, а также imagePullPolicy также правильно читается (подтверждается переопределением значения imagePullPolicy: "Always" из imagePullPolicy: "IfNotPresent"), но ключ для не читается правильно. Я могу подтвердить это, так как я получаю Base credentials not providedошибка при извлечении изображения из репозитория ecr. Я подтвердил, что учетные данные верны, и я могу создать модуль, пытаясь сделать это явно.

Даже при попытке установить imagePullSecrets в airflow.cfg напрямую, я все равно получаю ту же ошибку.

Я также попытался создать переопределение модуля, используя API V1 явно следующим образом:

      start_task = PythonOperator(
            task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
            executor_config={
                "pod_template_file": "<path_to_template>",
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="base",
                                image="<image_override>",
                                image_pull_policy="<pull_policy>"
                            ),
                        ],
                        image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
                    )
                ),
            },
        )

В этом случае я могу правильно загрузить образ докера без каких-либо ошибок аутентификации. Но, к сожалению, модуль выдает ошибку: AttributeError: 'V1Container' object has no attribute '_startup_probe'

      Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
    if not self.task_instance.check_and_change_state_before_execution(
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
    session.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
    persistence.save_obj(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
    for (
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
    state.manager[propkey].impl.is_equal(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
    return x == y
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
    return self.to_dict() == other.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
    result[attr] = value.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
    result[attr] = list(map(
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
    value = getattr(self, attr)
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
    return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'

2 ответа

У меня была аналогичная проблема. Проблема заключалась в том, что мы изменили наши контейнеры воздушного потока и обновили библиотеку Kubernetes в новых контейнерах. Не обязательно проблема с новой библиотекой Kubernetes, но Airflow сериализовал некоторые объекты (в нашем случае TaskInstance, по-видимому, также имеет место в вашем случае в соответствии с общей обратной трассировкой), и он десериализует их и создает из них объект Python. Итак, в вашем случае он воссоздает V1Containerобъект из сериализованной формы, которую он имел. Новый объект в вашем случае структурирован в Python таким образом , что в его инициализаторе установлен атрибут _startup_probe. Но сериализованная версия не имеет этого атрибута, поэтому кажется, что это версия до этой фиксации. Кажется, что десериализация не вызывает проблем, но всякий раз, когда используется метод to_dict, возникают проблемы. В вашем случае он используется для сравнения (eq), для меня это было при регистрации, поскольку repr использует его.

Сообщество Airflow Slack указало мне на это изменение , которое должно решить эту проблему. Я еще не смог это проверить, но уже делюсь этим здесь на случай, если кто-то попадет в него.

У меня была похожая проблема, которая начала возникать, когда мы обновили нашу версию воздушного потока. проблема заключалась в том, что мы устанавливали более старую версию kubernetes, которая была несовместима с последним воздушным потоком, при создании пользовательского контейнера kubernetes (в файле докеров). Использование последней версии kubernetes в dockerfile устранило проблему.

Другие вопросы по тегам