dag_id не найден с помощью Airflow KubernetesPodOperator (Minikube) и DAG m

Я пытаюсь запустить DAG на основе KubernetesPodOperator с Minikube. Группы DAG монтируются с моего локального компьютера. Проблема в том, что группы DAG не обнаруживаются модулем:

      [2021-09-18 13:00:11,334] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags/composer_sample_kubernetes_pod.py
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 220, in task_run
    dag = get_dag(args.subdir, args.dag_id)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 193, in get_dag
    'parse.'.format(dag_id)
airflow.exceptions.AirflowException: dag_id could not be found: composer_sample_kubernetes_pod. Either the dag did not exist or it failed to parse.

Как устанавливается Airflow:

      helm install $AIRFLOW_NAME apache-airflow/airflow \
    --values values.yml \
    --set logs.persistence.enabled=true \
    --namespace $AIRFLOW_NAMESPACE \
    --kubeconfig ~/.kube/config

values.yml:

      uid: 50000
gid: 50000

executor: KubernetesExecutor
config:
  core:
    dags_folder: /opt/airflow/dags
webserver:
  extraVolumes:
    - name: dags
      hostPath:
        path: /mnt/airflow/dags
        type: Directory
  extraVolumeMounts:
    - name: dags
      mountPath: /opt/airflow/dags
scheduler:
  extraVolumes:
    - name: dags
      hostPath:
        path: /mnt/airflow/dags
  extraVolumeMounts:
    - name: dags
      mountPath: /opt/airflow/dags

DAG:

      import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

from kubernetes.client import models as k8s

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:

    kubernetes_min_pod = KubernetesPodOperator(
        task_id='pod-ex-minimum',
        name='pod-ex-minimum',
        cmds=['python', 'labelling.py'],
        namespace='airflow',
        is_delete_operator_pod=False,
        get_logs=True,,
        image='toto',
        pod_template_file='/opt/airflow/dags/pod_template_file.yml'
    )

pod_template_file.yml

      ---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
  labels:
    tier: airflow
    component: worker
    release: airflow
spec:
  containers:
    - args: []
      command: []
      envFrom:      
        []
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor      
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: airflow-fernet-key
              key: fernet-key
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: airflow-airflow-metadata
              key: connection
        - name: AIRFLOW__WEBSERVER__SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: airflow-webserver-secret-key
              key: webserver-secret-key      
        # Dynamically created environment variables
        # Dynamically created secret envs
        
        # Extra env
      image: apache/airflow:2.1.2
      imagePullPolicy: IfNotPresent
      name: base
      ports: []
      resources:
        {}
      volumeMounts:
        - name: logs
          mountPath: "/opt/airflow/logs"
        - name: dags
          mountPath: "/opt/airflow/dags"
        - name: config
          mountPath: "/opt/airflow/airflow.cfg"
          subPath: airflow.cfg
          readOnly: true
  hostNetwork: false
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 0
  nodeSelector: 
    {}
  affinity: 
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - podAffinityTerm:
          labelSelector:
            matchLabels:
              component: worker
          topologyKey: kubernetes.io/hostname
        weight: 100
  tolerations: 
    []
  serviceAccountName: airflow-worker
  volumes:
  - name: logs
    persistentVolumeClaim:
      claimName: airflow-logs
  - name: config
    configMap:
      name: airflow-airflow-config
  - name: dags
    hostPath:
      # directory location on host
      path: /mnt/airflow/dags
      # this field is optional
      type: Directory

Как Minikube запускается с точкой монтирования: minikube start --mount=true --mount-string=/home/path/dags/:/mnt/airflow/dags/

Группы DAG монтируются с моей локальной машины на хост-машину Minikube. Затем монтируется с хост-машины на модуль, который должен запускать DAG.

Я не понимаю, чего не хватает, чтобы модули знали о моих DAG?

0 ответов

Другие вопросы по тегам