Установите зависимости Python для воздушного потока apache

Я использую воздушный поток apache для запуска своих дагов. Я хочу установить зависимости python: requests==2.22.0

Мой файл docker для веб-сервера, планировщика и postgres:

version: "2.1"
services:
  postgres_airflow:
    image: postgres:12
    environment:
        - POSTGRES_USER=airflow
        - POSTGRES_PASSWORD=airflow
        - POSTGRES_DB=airflow
    ports:
        - "5432:5432"

  postgres_Service:
    image: postgres:12
    environment:
        - POSTGRES_USER=developer
        - POSTGRES_PASSWORD=secret
        - POSTGRES_DB=service_db
    ports:
        - "5433:5432"
 
  scheduler:
    image: apache/airflow
    restart: always
    depends_on:
      - postgres_airflow
      - postgres_Service
      - webserver
    env_file:
      - .env
    volumes:
        - ./dags:/opt/airflow/dags
    command: scheduler
    healthcheck:
        test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
        interval: 30s
        timeout: 30s
        retries: 3

  webserver:
    image: apache/airflow
    restart: always
    depends_on:
        - pg_airflow
        - pg_metadata
        - tenants-registry-api
        - metadata-api
    env_file:
      - .env
    volumes:
        - ./dags:/opt/airflow/dags
        - ./scripts:/opt/airflow/scripts
    ports:
        - "8080:8080"
    entrypoint: ./scripts/airflow-entrypoint.sh
    healthcheck:
        test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
        interval: 30s
        timeout: 30s
        retries: 3

Мой файл DAG:

import requests
from datetime import datetime

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

default_args = {'owner': 'airflow',
                'start_date': datetime(2018, 1, 1)
                }

dag = DAG('download2',
          schedule_interval='0 * * * *',
          default_args=default_args,
          catchup=False)


def hello_world_py():
    requests.post(url)
    print('Hello World')


with dag:
    t1 = PythonOperator(
        task_id='download2',
        python_callable=hello_world_py,
        requirements=['requests==2.22.0'],
        provide_context=True,
        dag=dag
    )


Проблемы, с которыми я сталкиваюсь:

  1. Я не могу использовать PythonVirtualenvOperator для установки требований, так как я столкнулся с проблемой исключения файла журнала воздушного потока

  2. Я не могу использовать что-то вроде:

    build:
      args:
        PYTHON_DEPS: "requests==2.22.0"

поскольку у меня нет Dockerfile в контексте. У меня есть изображение с apavhe/airflow.

  1. Я не могу использовать монтирование тома./requirements.txt:requirements.txt в initdb, так как я не использую контейнер initdb. Я использую только команду в скрипте airflow initdb.

Любое решение трех вышеперечисленных проблем подойдет.

0 ответов

Другие вопросы по тегам