Ошибка Airflow Kubernetes Executor (не удалось найти dag_id)
Пытаюсь сделать первые шаги с помощью Airflow (решил начать с последней выпущенной версии, 2.1.0).
Ниже вы можете найти шаги, которые я выполнил.
Я хочу указать, что я запускаю тест локально с помощью Minikube.
Я начал создавать свой собственный образ, используя следующий файл Dockerfile:
FROM python:3.8-slim
LABEL maintainer="Paolo"
ENV DEBIAN_FRONTEND noninteractive
ENV TERM linux
# Airflow
ARG AIRFLOW_VERSION=2.1.0
ARG AIRFLOW_USER_HOME=/usr/local/airflow
ENV AIRFLOW_HOME=${AIRFLOW_USER_HOME}
# Define en_US.
ENV LANGUAGE en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LC_ALL en_US.UTF-8
ENV LC_CTYPE en_US.UTF-8
ENV LC_MESSAGES en_US.UTF-8
COPY ./constraints-3.8.txt /constraints-3.8.txt
RUN set -ex \
&& buildDeps=' \
freetds-dev \
libkrb5-dev \
libsasl2-dev \
libssl-dev \
libffi-dev \
libpq-dev \
git \
' \
&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& apt-get install -yqq --no-install-recommends \
$buildDeps \
freetds-bin \
build-essential \
default-libmysqlclient-dev \
apt-utils \
curl \
vim \
rsync \
netcat \
locales \
&& sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
&& locale-gen \
&& update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
&& useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \
&& pip install --upgrade "pip==20.2.4" \
&& pip install -U pip setuptools wheel \
&& pip install pytz \
&& pip install pyOpenSSL \
&& pip install ndg-httpsclient \
&& pip install pyasn1 \
&& pip install apache-airflow[kubernetes,postgres,ssh]==${AIRFLOW_VERSION} --constraint constraints-3.8.txt \
&& pip install 'redis==3.2' \
&& apt-get purge --auto-remove -yqq $buildDeps \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf \
/var/lib/apt/lists/* \
/tmp/* \
/var/tmp/* \
/usr/share/man \
/usr/share/doc \
/usr/share/doc-base
COPY config/airflow.cfg ${AIRFLOW_USER_HOME}/airflow.cfg
COPY script/entrypoint.sh ${AIRFLOW_USER_HOME}/entrypoint.sh
RUN chown -R airflow: ${AIRFLOW_USER_HOME} && chmod -R 775 ${AIRFLOW_USER_HOME}
EXPOSE 8080 5555 8793
USER airflow
WORKDIR ${AIRFLOW_USER_HOME}
( Скрипт точки входа просто запускает планировщик двух прецессов и веб-сервер. )
И используя следующий файл airflow.cfg .
[core]
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
logging_level = DEBUG
executor = KubernetesExecutor
parallelism = 32
load_examples = False
fernet_key = GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=
plugins_folder = /usr/local/airflow/plugins
sql_alchemy_conn = postgresql://root:root@postgres-airflow:5432/airflow
[scheduler]
dag_dir_list_interval = 300
child_process_log_directory = /usr/local/airflow/logs/scheduler
job_heartbeat_sec = 5
max_threads = 2
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1
print_stats_interval = 30
scheduler_zombie_task_threshold = 300
max_tis_per_query = 0
authenticate = False
# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://0.0.0.0:8000
rbac=True
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8000
# Paths to the SSL certificate and key for the web server. When both are
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_cert =
web_server_ssl_key =
# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
web_server_master_timeout = 120
# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120
# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
# Secret key used to run your flask app
secret_key = GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz
# Number of workers to run the Gunicorn web server
workers = 4
# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync
# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = -
# Expose the configuration file in the web server
expose_config = False
# Default DAG view. Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree
# Default DAG orientation. Valid values are:
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR
# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5
# By default, the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False
# Consistent page size across all listing views in the UI
page_size = 100
[kubernetes]
#airflow_configmap = airflow-configmap
worker_container_repository = pscarpino/airflow
worker_container_tag = latest
worker_container_image_pull_policy = Always
worker_service_account_name = airflow
namespace = airflow-example
delete_worker_pods = True
dags_in_image = True
git_repo = https://github.com/kipliko/airflow.git
git_branch = main
#git_subpath =
#git_user =
#git_password =
#git_sync_root =
#git_sync_path =
git_dags_folder_mount_point = /usr/local/airflow/dags
dags_volume_claim = airflow-dags-git
dags_volume_subpath =
logs_volume_claim = airflow-logs
#logs_volume_subpath =
#dags_volume_host =
#logs_volume_host =
in_cluster = True
#gcp_service_account_keys =
# Example affinity and toleration definitions.
affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
git_sync_container_tag = v2.0.5
git_sync_init_container_name = git-sync-clone
[kubernetes_secrets]
SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
[cli]
api_client = airflow.api.client.json_client
endpoint_url = http://0.0.0.0:8080
[api]
auth_backend = airflow.api.auth.backend.default
[github_enterprise]
api_rev = v3
[admin]
# UI to hide sensitive variable fields when set to True
hide_sensitive_variable_fields = True
Что касается развертывания, как я уже сказал, я использую minikube со следующим файлом yaml.
( PersistentVolume и PersistentVolumeClaim « airflow-dags-git » были ранее созданы с использованием другого файла )
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow
namespace: airflow-example
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow-example
name: airflow
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["pods", "pods/log"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow
namespace: airflow-example
subjects:
- kind: ServiceAccount
name: airflow # Name of the ServiceAccount
namespace: airflow-example
roleRef:
kind: Role # This must be Role or ClusterRole
name: airflow # This must match the name of the Role or ClusterRole you wish to bind to
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-example
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
serviceAccountName: airflow
initContainers:
- name: init
image: pscarpino/airflow
imagePullPolicy: Always
command:
- "bash"
args:
- "-cx"
- "/usr/local/airflow/entrypoint.sh"
env:
- name: AIRFLOW__CORE__FERNET_KEY
value: GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=
containers:
- name: webserver
image: pscarpino/airflow
command:
- airflow
args:
- webserver
env:
- name: AIRFLOW__CORE__FERNET_KEY
value: GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=
imagePullPolicy: Always
volumeMounts:
- mountPath: /usr/local/airflow/logs/
mountPropagation: None
name: airflow-logs
- name: airflow-dags-git
mountPath: /usr/local/airflow/dags
readOnly: false
- name: scheduler
command:
- airflow
args:
- scheduler
env:
- name: AIRFLOW__CORE__FERNET_KEY
value: GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=
image: pscarpino/airflow
imagePullPolicy: Always
name: airflow-scheduler
volumeMounts:
- mountPath: /usr/local/airflow/logs/
mountPropagation: None
name: airflow-logs
- name: airflow-dags-git
mountPath: /usr/local/airflow/dags
readOnly: false
volumes:
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
- name: airflow-dags-git
persistentVolumeClaim:
claimName: airflow-dags-git
---
apiVersion: v1
kind: Service
metadata:
name: airflow
spec:
type: LoadBalancer
ports:
- port: 8080
selector:
name: airflow
Когда я пытаюсь запустить простой DAG, создается экземпляр модуля, но сразу же выходит из строя. В частности, я получаю следующую ошибку: « airflow.exceptions.AirflowException: dag_id не может быть найден: tuto. Либо dag не существует, либо его не удалось проанализировать ».
Анализируя структуру с помощью kubectl describe, я заметил, что в модуле нет тома, содержащего DAG.
Volumes:
default-token-qvhrg:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-qvhrg
Optional: false
Я ожидал найти то, что применял до " airflow-dags-git ".
Может ли кто-нибудь помочь мне выяснить, в чем проблема?
Спасибо.
Киплико