Структура воздушного потока / организация задач и задач

Мои вопросы:

  • Что такое хорошая структура каталогов для организации ваших задач и задач? (примеры дагс показывают только пару задач)
  • В настоящее время у меня есть свои dags в корне папки dags и мои задачи в отдельных каталогах, не уверен, как это сделать?
  • Должны ли мы использовать ZIP-файлы? https://github.com/apache/incubator-airflow/blob/a1f4227bee1a70531cfa90769149322513cb6f92/airflow/models.py

3 ответа

Решение

Я хотел бы сравнить структуру папок с другими людьми. Возможно, это будет зависеть от того, для чего вы используете Airflow, но я поделюсь своим примером. Я делаю конвейеры данных для создания хранилища данных, поэтому на высоком уровне у меня в основном два шага:

  1. Сбросить много данных в озеро данных (напрямую доступное только нескольким людям)
  2. Загрузка данных из озера данных в аналитическую базу данных, где данные будут смоделированы и предоставлены приложениям панели мониторинга (много SQL-запросов для моделирования данных)

Сегодня я организовал файлы в три основные папки, которые пытаются отразить логику выше:

├── dags
│   ├── dag_1.py
│   └── dag_2.py
├── data-lake
│   ├── data-source-1
│   └── data-source-2
└── dw
    ├── cubes
    │   ├── cube_1.sql
    │   └── cube_2.sql
    ├── dims
    │   ├── dim_1.sql
    │   └── dim_2.sql
    └── facts
        ├── fact_1.sql
        └── fact_2.sql

Это более или менее моя основная структура папок.

Я использую что-то вроде этого.

  • Проект обычно является чем-то совершенно отдельным или уникальным. Возможно, группы обеспечения доступности баз данных для обработки файлов, которые мы получаем от определенного клиента, которые совершенно не связаны со всем остальным (почти наверняка, с отдельной схемой базы данных).
  • У меня есть операторы, хуки и некоторые вспомогательные скрипты (удаляют все данные Airflow для определенного DAG и т. Д.) В общей папке
  • Раньше у меня был один репозиторий git для всей папки Airflow, но теперь у меня есть отдельный git для каждого проекта (делает его более организованным и проще предоставлять разрешения для Gitlab, поскольку проекты не связаны между собой). Это означает, что каждая папка проекта также как.git и.gitignore, и т. Д.
  • Я склонен сохранять необработанные данные, а затем "отдыхать" измененную копию данных, которая является именно тем, что копируется в базу данных. Мне приходится сильно изменять некоторые необработанные данные из-за разных форматов от разных клиентов (Excel, веб-очистка, HTML-очистка электронной почты, плоские файлы, запросы из SalesForce или других источников базы данных...)

Пример дерева:

├───dags
│   ├───common
│   │   ├───hooks
│   │   │       pysftp_hook.py
│   │   │
│   │   ├───operators
│   │   │       docker_sftp.py
│   │   │       postgres_templated_operator.py
│   │   │
│   │   └───scripts
│   │           delete.py
│   │
│   ├───project_1
│   │   │   dag_1.py
│   │   │   dag_2.py
│   │   │
│   │   └───sql
│   │           dim.sql
│   │           fact.sql
│   │           select.sql
│   │           update.sql
│   │           view.sql
│   │
│   └───project_2
│       │   dag_1.py
│       │   dag_2.py
│       │
│       └───sql
│               dim.sql
│               fact.sql
│               select.sql
│               update.sql
│               view.sql
│
└───data
    ├───project_1
    │   ├───modified
    │   │       file_20180101.csv
    │   │       file_20180102.csv
    │   │
    │   └───raw
    │           file_20180101.csv
    │           file_20180102.csv
    │
    └───project_2
        ├───modified
        │       file_20180101.csv
        │       file_20180102.csv
        │
        └───raw
                file_20180101.csv
                file_20180102.csv

Я использую Google Cloud Composer. Мне нужно управлять несколькими проектами с помощью некоторых дополнительных сценариев SQL, и я хочу синхронизировать все черезgsutil rsyncПоэтому я использую следующую структуру:

      ├───dags
│   │
│   ├───project_1
│       │
│       ├───dag_bag.py
│       │
│       ├───.airflowignore
│       │
│       ├───dag_1
│       │      dag.py
│       │      script.sql
│
├───plugins
│   │
│   ├───hooks
│   │      hook_1.py
│   │   
│   ├───sensors
│   │      sensor_1.py
│   │
│   ├───operators
│   │      operator_1.py

И файлdag_bag.pyсодержит эти строки

      from airflow.models import DagBag

dag_bag = DagBag(dag_folder="/home/airflow/gcs/dags/project_1", include_examples=False)
Другие вопросы по тегам