KubernetesPodOperator не может получить доступ к соединениям Airflow, хранящимся в Google Secrets Manager.

я использую Composer version 2.0.0 Airflow 2.1.4и я создал файл, который пытается получить доступ к соединению Airflow, хранящемуся в Google Secrets Manager. Но он не может найти учетные данные (см. исключения ниже). Я также пытался передать секреты подключения воздушного потока env_varsв KubernetesPodOperatorа также пытался передать его как Kubernetes Secretsк стручку, но все равно не повезло.

Ниже приведен мой код для обоих случаев выше:

      aws_uri = BaseHook.get_connection('aws_conn').get_uri()

download_file = KubernetesPodOperator(
        task_id="download_file_s3_to_gcs",
        dag=dag,
        name="download_file_s3_to_gcs",
        namespace=NAMESPACE,
        in_cluster=True,
        image=IMAGE_NAME,
        arguments=[
            "python3",
            "%s" % FILENAME,
        ],
        service_account_name=K_SERVICE_ACCOUNT,
        env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID), k8s.V1EnvVar(name="aws_conn_id", value=aws_uri)],
        is_delete_operator_pod=True,
    )
      secret_aws_conn_id = Secret(
        deploy_type='env',
        deploy_target='AWS_CONN_ID',
        secret='aws-conn-id', // my kubernetes secret
        key='aws-conn-key',
    )

download_file = KubernetesPodOperator(
        task_id="download_file_s3_to_gcs",
        dag=dag,
        name="download_file_s3_to_gcs",
        namespace=NAMESPACE,
        in_cluster=True,
        image=IMAGE_NAME,
        arguments=[
            "python3",
            "%s" % FILENAME,
        ],
        service_account_name=K_SERVICE_ACCOUNT,
        secrets=[secret_aws_conn_id],
        env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID)],
        is_delete_operator_pod=True,
    )

Хотя, когда я печатаю connection_idURI я могу правильно получить URI для него, но задание всегда выдает исключения ниже

      [2022-04-01 08:01:09,563] {pod_manager.py:197} INFO - botocore.exceptions.NoCredentialsError: Unable to locate credentials

[2022-04-01 08:01:09,374] {pod_manager.py:197} INFO - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: connection
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - [SQL: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - FROM connection
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO - WHERE connection.conn_id = ?
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO -  LIMIT ? OFFSET ?]

[2022-04-01 08:01:09,354] {pod_manager.py:197} INFO - [[34m2022-04-01 08:01:09,297[0m] {[34mconnection.py:[0m407} ERROR[0m - Unable to retrieve connection from secrets backend (MetastoreBackend). 
Checking subsequent secrets backend.[0m

Может ли кто-нибудь помочь мне решить эту проблему?

1 ответ

Вы не можете напрямую передавать подключения Airflow от Airflow к модулю.

Журнал ошибок показывает, что у вас есть код оператора Airflow, работающий внутри Pod. Этот код вызовет диспетчер секретов/базу данных метаданных Airflow для получения учетных данных на основе идентификатора соединения. Как видно из журнала, Pod не может получить доступ к базе данных метаданных.

Я бы рекомендовал передавать учетные данные GCP в качестве секрета Kubernetes и использовать клиент Python GCS внутри контейнера. Сам контейнер не должен зависеть от Airflow — Airflow только планирует его.