Ошибка Airflow Kubernetes Executor (не удалось найти dag_id)

Пытаюсь сделать первые шаги с помощью Airflow (решил начать с последней выпущенной версии, 2.1.0).

Ниже вы можете найти шаги, которые я выполнил.
Я хочу указать, что я запускаю тест локально с помощью Minikube.

Я начал создавать свой собственный образ, используя следующий файл Dockerfile:

      FROM python:3.8-slim
LABEL maintainer="Paolo"

ENV DEBIAN_FRONTEND noninteractive
ENV TERM linux

# Airflow
ARG AIRFLOW_VERSION=2.1.0
ARG AIRFLOW_USER_HOME=/usr/local/airflow
ENV AIRFLOW_HOME=${AIRFLOW_USER_HOME}

# Define en_US.
ENV LANGUAGE en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LC_ALL en_US.UTF-8
ENV LC_CTYPE en_US.UTF-8
ENV LC_MESSAGES en_US.UTF-8

COPY ./constraints-3.8.txt /constraints-3.8.txt

RUN set -ex \
    && buildDeps=' \
        freetds-dev \
        libkrb5-dev \
        libsasl2-dev \
        libssl-dev \
        libffi-dev \
        libpq-dev \
        git \
    ' \
    && apt-get update -yqq \
    && apt-get upgrade -yqq \
    && apt-get install -yqq --no-install-recommends \
        $buildDeps \
        freetds-bin \
        build-essential \
        default-libmysqlclient-dev \
        apt-utils \
        curl \
        vim \
        rsync \
        netcat \
        locales \
    && sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
    && locale-gen \
    && update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
    && useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \
    && pip install --upgrade "pip==20.2.4" \
    && pip install -U pip setuptools wheel \
    && pip install pytz \
    && pip install pyOpenSSL \
    && pip install ndg-httpsclient \
    && pip install pyasn1 \
    && pip install apache-airflow[kubernetes,postgres,ssh]==${AIRFLOW_VERSION} --constraint constraints-3.8.txt \
    && pip install 'redis==3.2' \
    && apt-get purge --auto-remove -yqq $buildDeps \
    && apt-get autoremove -yqq --purge \
    && apt-get clean \
    && rm -rf \
        /var/lib/apt/lists/* \
        /tmp/* \
        /var/tmp/* \
        /usr/share/man \
        /usr/share/doc \
        /usr/share/doc-base


COPY config/airflow.cfg ${AIRFLOW_USER_HOME}/airflow.cfg
COPY script/entrypoint.sh ${AIRFLOW_USER_HOME}/entrypoint.sh

RUN chown -R airflow: ${AIRFLOW_USER_HOME} && chmod -R 775 ${AIRFLOW_USER_HOME}

EXPOSE 8080 5555 8793

USER airflow
WORKDIR ${AIRFLOW_USER_HOME}

( Скрипт точки входа просто запускает планировщик двух прецессов и веб-сервер. )

И используя следующий файл airflow.cfg .

      
[core]
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
logging_level = DEBUG
executor = KubernetesExecutor
parallelism = 32
load_examples = False
fernet_key = GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=
plugins_folder = /usr/local/airflow/plugins
sql_alchemy_conn = postgresql://root:root@postgres-airflow:5432/airflow

[scheduler]
dag_dir_list_interval = 300
child_process_log_directory = /usr/local/airflow/logs/scheduler
job_heartbeat_sec = 5
max_threads = 2

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0

statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1

print_stats_interval = 30
scheduler_zombie_task_threshold = 300
max_tis_per_query = 0
authenticate = False

# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True

[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://0.0.0.0:8000
rbac=True

# The ip specified when starting the web server
web_server_host = 0.0.0.0

# The port on which to run the web server
web_server_port = 8000

# Paths to the SSL certificate and key for the web server. When both are
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_cert =
web_server_ssl_key =

# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
web_server_master_timeout = 120

# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120

# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30

# Secret key used to run your flask app
secret_key = GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz

# Number of workers to run the Gunicorn web server
workers = 4

# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync

# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = -

# Expose the configuration file in the web server
expose_config = False

# Default DAG view.  Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree

# Default DAG orientation. Valid values are:
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR

# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False

# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5

# By default, the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False

# Consistent page size across all listing views in the UI
page_size = 100


[kubernetes]
#airflow_configmap = airflow-configmap
worker_container_repository = pscarpino/airflow
worker_container_tag = latest
worker_container_image_pull_policy = Always
worker_service_account_name = airflow
namespace = airflow-example
delete_worker_pods = True
dags_in_image = True
git_repo = https://github.com/kipliko/airflow.git
git_branch = main
#git_subpath = 
#git_user =
#git_password =
#git_sync_root = 
#git_sync_path = 
git_dags_folder_mount_point = /usr/local/airflow/dags
dags_volume_claim = airflow-dags-git
dags_volume_subpath = 
logs_volume_claim = airflow-logs
#logs_volume_subpath =
#dags_volume_host =
#logs_volume_host =
in_cluster = True
#gcp_service_account_keys =

# Example affinity and toleration definitions.
affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]

# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
git_sync_container_tag = v2.0.5
git_sync_init_container_name = git-sync-clone

[kubernetes_secrets]
SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn


[cli]
api_client = airflow.api.client.json_client
endpoint_url = http://0.0.0.0:8080

[api]
auth_backend = airflow.api.auth.backend.default

[github_enterprise]
api_rev = v3

[admin]
# UI to hide sensitive variable fields when set to True
hide_sensitive_variable_fields = True

Что касается развертывания, как я уже сказал, я использую minikube со следующим файлом yaml.

( PersistentVolume и PersistentVolumeClaim « airflow-dags-git » были ранее созданы с использованием другого файла )

      
apiVersion: v1
kind: ServiceAccount
metadata:
  name: airflow
  namespace: airflow-example
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow-example
  name: airflow
rules:
- apiGroups: [""] # "" indicates the core API group
  resources: ["pods", "pods/log"]
  verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["batch", "extensions"]
  resources: ["jobs"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---  
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow
  namespace: airflow-example
subjects:
- kind: ServiceAccount
  name: airflow # Name of the ServiceAccount
  namespace: airflow-example
roleRef:
  kind: Role # This must be Role or ClusterRole
  name: airflow # This must match the name of the Role or ClusterRole you wish to bind to
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: airflow-example
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow
  template:
    metadata:
      labels:
        name: airflow
    spec:
      serviceAccountName: airflow
      initContainers:
        - name: init
          image: pscarpino/airflow
          imagePullPolicy: Always
          command:
          - "bash"
          args:
            - "-cx"
            - "/usr/local/airflow/entrypoint.sh"
          env:
            - name: AIRFLOW__CORE__FERNET_KEY
              value: GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=

      containers:
        
        - name: webserver
          image: pscarpino/airflow
          command: 
            - airflow
          args:
            - webserver
          env:
            - name: AIRFLOW__CORE__FERNET_KEY
              value: GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=
          imagePullPolicy: Always
          volumeMounts:
            - mountPath: /usr/local/airflow/logs/
              mountPropagation: None
              name: airflow-logs
            - name: airflow-dags-git
              mountPath: /usr/local/airflow/dags
              readOnly: false

        - name: scheduler
          command: 
            - airflow
          args:
            - scheduler
          env:
            - name: AIRFLOW__CORE__FERNET_KEY
              value: GHFnmOTWmEKuASev_ozuxmlfFEUedyMUz865IzvnUTY=
          image: pscarpino/airflow
          imagePullPolicy: Always
          name: airflow-scheduler
          volumeMounts:
            - mountPath: /usr/local/airflow/logs/
              mountPropagation: None
              name: airflow-logs
            - name: airflow-dags-git
              mountPath: /usr/local/airflow/dags
              readOnly: false
            
      volumes:
        - name: airflow-logs
          persistentVolumeClaim:
            claimName: airflow-logs
        - name: airflow-dags-git
          persistentVolumeClaim:
            claimName: airflow-dags-git

---
apiVersion: v1
kind: Service
metadata:
  name: airflow
spec:
  type: LoadBalancer
  ports:
    - port: 8080
  selector:
    name: airflow

Когда я пытаюсь запустить простой DAG, создается экземпляр модуля, но сразу же выходит из строя. В частности, я получаю следующую ошибку: « airflow.exceptions.AirflowException: dag_id не может быть найден: tuto. Либо dag не существует, либо его не удалось проанализировать ».

Анализируя структуру с помощью kubectl describe, я заметил, что в модуле нет тома, содержащего DAG.

      Volumes:
 default-token-qvhrg:
   Type:        Secret (a volume populated by a Secret)
   SecretName:  default-token-qvhrg
   Optional:    false

Я ожидал найти то, что применял до " airflow-dags-git ".

Может ли кто-нибудь помочь мне выяснить, в чем проблема?

Спасибо.

Киплико

0 ответов

Другие вопросы по тегам