Структура воздушного потока / организация задач и задач
Мои вопросы:
- Что такое хорошая структура каталогов для организации ваших задач и задач? (примеры дагс показывают только пару задач)
- В настоящее время у меня есть свои dags в корне папки dags и мои задачи в отдельных каталогах, не уверен, как это сделать?
- Должны ли мы использовать ZIP-файлы? https://github.com/apache/incubator-airflow/blob/a1f4227bee1a70531cfa90769149322513cb6f92/airflow/models.py
3 ответа
Решение
Я хотел бы сравнить структуру папок с другими людьми. Возможно, это будет зависеть от того, для чего вы используете Airflow, но я поделюсь своим примером. Я делаю конвейеры данных для создания хранилища данных, поэтому на высоком уровне у меня в основном два шага:
- Сбросить много данных в озеро данных (напрямую доступное только нескольким людям)
- Загрузка данных из озера данных в аналитическую базу данных, где данные будут смоделированы и предоставлены приложениям панели мониторинга (много SQL-запросов для моделирования данных)
Сегодня я организовал файлы в три основные папки, которые пытаются отразить логику выше:
├── dags
│ ├── dag_1.py
│ └── dag_2.py
├── data-lake
│ ├── data-source-1
│ └── data-source-2
└── dw
├── cubes
│ ├── cube_1.sql
│ └── cube_2.sql
├── dims
│ ├── dim_1.sql
│ └── dim_2.sql
└── facts
├── fact_1.sql
└── fact_2.sql
Это более или менее моя основная структура папок.
Я использую что-то вроде этого.
- Проект обычно является чем-то совершенно отдельным или уникальным. Возможно, группы обеспечения доступности баз данных для обработки файлов, которые мы получаем от определенного клиента, которые совершенно не связаны со всем остальным (почти наверняка, с отдельной схемой базы данных).
- У меня есть операторы, хуки и некоторые вспомогательные скрипты (удаляют все данные Airflow для определенного DAG и т. Д.) В общей папке
- Раньше у меня был один репозиторий git для всей папки Airflow, но теперь у меня есть отдельный git для каждого проекта (делает его более организованным и проще предоставлять разрешения для Gitlab, поскольку проекты не связаны между собой). Это означает, что каждая папка проекта также как.git и.gitignore, и т. Д.
- Я склонен сохранять необработанные данные, а затем "отдыхать" измененную копию данных, которая является именно тем, что копируется в базу данных. Мне приходится сильно изменять некоторые необработанные данные из-за разных форматов от разных клиентов (Excel, веб-очистка, HTML-очистка электронной почты, плоские файлы, запросы из SalesForce или других источников базы данных...)
Пример дерева:
├───dags
│ ├───common
│ │ ├───hooks
│ │ │ pysftp_hook.py
│ │ │
│ │ ├───operators
│ │ │ docker_sftp.py
│ │ │ postgres_templated_operator.py
│ │ │
│ │ └───scripts
│ │ delete.py
│ │
│ ├───project_1
│ │ │ dag_1.py
│ │ │ dag_2.py
│ │ │
│ │ └───sql
│ │ dim.sql
│ │ fact.sql
│ │ select.sql
│ │ update.sql
│ │ view.sql
│ │
│ └───project_2
│ │ dag_1.py
│ │ dag_2.py
│ │
│ └───sql
│ dim.sql
│ fact.sql
│ select.sql
│ update.sql
│ view.sql
│
└───data
├───project_1
│ ├───modified
│ │ file_20180101.csv
│ │ file_20180102.csv
│ │
│ └───raw
│ file_20180101.csv
│ file_20180102.csv
│
└───project_2
├───modified
│ file_20180101.csv
│ file_20180102.csv
│
└───raw
file_20180101.csv
file_20180102.csv
Я использую Google Cloud Composer. Мне нужно управлять несколькими проектами с помощью некоторых дополнительных сценариев SQL, и я хочу синхронизировать все черезgsutil rsync
Поэтому я использую следующую структуру:
├───dags
│ │
│ ├───project_1
│ │
│ ├───dag_bag.py
│ │
│ ├───.airflowignore
│ │
│ ├───dag_1
│ │ dag.py
│ │ script.sql
│
├───plugins
│ │
│ ├───hooks
│ │ hook_1.py
│ │
│ ├───sensors
│ │ sensor_1.py
│ │
│ ├───operators
│ │ operator_1.py
И файлdag_bag.py
содержит эти строки
from airflow.models import DagBag
dag_bag = DagBag(dag_folder="/home/airflow/gcs/dags/project_1", include_examples=False)