(Джанго) ОРМ в потоке воздуха - возможно ли это?
Как работать с моделями Django в задачах Airflow?
Согласно официальной документации Airflow, Airflow предоставляет хуки для взаимодействия с базами данных (например, MySqlHook / PostgresHook / etc), которые впоследствии могут быть использованы в Операторах для выполнения запросов строк. Прикрепление фрагментов основного кода:
Скопируйте из https://airflow.apache.org/_modules/mysql_hook.html
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
Скопируйте из https://airflow.apache.org/_modules/mysql_operator.html
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
Как мы видим, Hook инкапсулирует конфигурацию соединения, в то время как Operator предоставляет возможность выполнять пользовательские запросы.
Эта проблема:
Очень удобно использовать разные ORM для выборки и обработки объектов базы данных вместо исходного SQL по следующим причинам:
- В простых случаях ORM может быть гораздо более удобным решением, см. Определения ORM.
- Предположим, что уже существуют такие системы, как Django с определенными моделями и их методами. Каждый раз, когда меняются схемы этих моделей, необработанные SQL-запросы необходимо переписывать. ORM предоставляет унифицированный интерфейс для работы с такими моделями.
Почему-то нет примеров работы с ORM в задачах Airflow в терминах ловушек и операторов. В соответствии с использованием слоя базы данных Django вне Django? вопрос, необходимо настроить конфигурацию соединения с базой данных, а затем напрямую выполнить очереди в ORM, но выполнение этого вне соответствующих перехватчиков / операторов нарушает принципы Airflow. Это похоже на вызов BashOperator с командой "python work_with_django_models.py".
Наконец, мы хотим это:
Итак, каковы лучшие практики в этом случае? Разделяем ли мы какие-либо хуки / операторы для Django ORM / других ORM? Для того, чтобы следующий код был реальным (рассматривается как псевдокод!):
import os
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
"myapp.settings"
)
django.setup()
from your_app import models
def get_and_modify_models(ds, **kwargs):
all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
all_objects[15].my_int_field = 25
all_objects[15].save()
return list(all_objects)
django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
вместо реализации этой функциональности в сыром SQL.
Я думаю, что это довольно важная тема, так как в этом случае целая часть основанных на ORM фреймворков и процессов не может погрузиться в Airflow.
Заранее спасибо!
1 ответ
Я согласен, что мы должны продолжать эту дискуссию, поскольку доступ к Django ORM может значительно снизить сложность решений.
Мой подход был 1) создать DjangoOperator
import os, sys
from airflow.models import BaseOperator
def setup_django_for_airflow():
# Add Django project root to path
sys.path.append('./project_root/')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
import django
django.setup()
class DjangoOperator(BaseOperator):
def pre_execute(self, *args, **kwargs):
setup_django_for_airflow()
и 2) Расширить этот DjangoOperator для логики / операторов, что выиграет от доступа к ORM
from .base import DjangoOperator
class DjangoExampleOperator(DjangoOperator):
def execute(self, context):
from myApp.models import model
model.objects.get_or_create()
С помощью этой стратегии вы сможете различать операторы, использующие Raw SQL / ORM. Также обратите внимание, что для оператора Django все импорты моделей django должны находиться в контексте выполнения, как показано выше.