dag_id не найден с помощью Airflow KubernetesPodOperator (Minikube) и DAG m
Я пытаюсь запустить DAG на основе KubernetesPodOperator с Minikube. Группы DAG монтируются с моего локального компьютера. Проблема в том, что группы DAG не обнаруживаются модулем:
[2021-09-18 13:00:11,334] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags/composer_sample_kubernetes_pod.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 220, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 193, in get_dag
'parse.'.format(dag_id)
airflow.exceptions.AirflowException: dag_id could not be found: composer_sample_kubernetes_pod. Either the dag did not exist or it failed to parse.
Как устанавливается Airflow:
helm install $AIRFLOW_NAME apache-airflow/airflow \
--values values.yml \
--set logs.persistence.enabled=true \
--namespace $AIRFLOW_NAMESPACE \
--kubeconfig ~/.kube/config
values.yml:
uid: 50000
gid: 50000
executor: KubernetesExecutor
config:
core:
dags_folder: /opt/airflow/dags
webserver:
extraVolumes:
- name: dags
hostPath:
path: /mnt/airflow/dags
type: Directory
extraVolumeMounts:
- name: dags
mountPath: /opt/airflow/dags
scheduler:
extraVolumes:
- name: dags
hostPath:
path: /mnt/airflow/dags
extraVolumeMounts:
- name: dags
mountPath: /opt/airflow/dags
DAG:
import datetime
from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
with models.DAG(
dag_id='composer_sample_kubernetes_pod',
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY) as dag:
kubernetes_min_pod = KubernetesPodOperator(
task_id='pod-ex-minimum',
name='pod-ex-minimum',
cmds=['python', 'labelling.py'],
namespace='airflow',
is_delete_operator_pod=False,
get_logs=True,,
image='toto',
pod_template_file='/opt/airflow/dags/pod_template_file.yml'
)
pod_template_file.yml
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
labels:
tier: airflow
component: worker
release: airflow
spec:
containers:
- args: []
command: []
envFrom:
[]
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
- name: AIRFLOW__WEBSERVER__SECRET_KEY
valueFrom:
secretKeyRef:
name: airflow-webserver-secret-key
key: webserver-secret-key
# Dynamically created environment variables
# Dynamically created secret envs
# Extra env
image: apache/airflow:2.1.2
imagePullPolicy: IfNotPresent
name: base
ports: []
resources:
{}
volumeMounts:
- name: logs
mountPath: "/opt/airflow/logs"
- name: dags
mountPath: "/opt/airflow/dags"
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 0
nodeSelector:
{}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
component: worker
topologyKey: kubernetes.io/hostname
weight: 100
tolerations:
[]
serviceAccountName: airflow-worker
volumes:
- name: logs
persistentVolumeClaim:
claimName: airflow-logs
- name: config
configMap:
name: airflow-airflow-config
- name: dags
hostPath:
# directory location on host
path: /mnt/airflow/dags
# this field is optional
type: Directory
Как Minikube запускается с точкой монтирования:
minikube start --mount=true --mount-string=/home/path/dags/:/mnt/airflow/dags/
Группы DAG монтируются с моей локальной машины на хост-машину Minikube. Затем монтируется с хост-машины на модуль, который должен запускать DAG.
Я не понимаю, чего не хватает, чтобы модули знали о моих DAG?