(Джанго) ОРМ в потоке воздуха - возможно ли это?

Как работать с моделями Django в задачах Airflow?

Согласно официальной документации Airflow, Airflow предоставляет хуки для взаимодействия с базами данных (например, MySqlHook / PostgresHook / etc), которые впоследствии могут быть использованы в Операторах для выполнения запросов строк. Прикрепление фрагментов основного кода:

Скопируйте из https://airflow.apache.org/_modules/mysql_hook.html

class MySqlHook(DbApiHook):
    conn_name_attr = 'mysql_conn_id'
    default_conn_name = 'mysql_default'
    supports_autocommit = True

    def get_conn(self):
        """
        Returns a mysql connection object
        """
        conn = self.get_connection(self.mysql_conn_id)
        conn_config = {
            "user": conn.login,
            "passwd": conn.password or ''
        }
        conn_config["host"] = conn.host or 'localhost'
        conn_config["db"] = conn.schema or ''
        conn = MySQLdb.connect(**conn_config)
        return conn

Скопируйте из https://airflow.apache.org/_modules/mysql_operator.html

class MySqlOperator(BaseOperator):
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, *args, **kwargs):
        super(MySqlOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        logging.info('Executing: ' + str(self.sql))
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        hook.run(
            self.sql,
            autocommit=self.autocommit,
            parameters=self.parameters)

Как мы видим, Hook инкапсулирует конфигурацию соединения, в то время как Operator предоставляет возможность выполнять пользовательские запросы.

Эта проблема:

Очень удобно использовать разные ORM для выборки и обработки объектов базы данных вместо исходного SQL по следующим причинам:

  1. В простых случаях ORM может быть гораздо более удобным решением, см. Определения ORM.
  2. Предположим, что уже существуют такие системы, как Django с определенными моделями и их методами. Каждый раз, когда меняются схемы этих моделей, необработанные SQL-запросы необходимо переписывать. ORM предоставляет унифицированный интерфейс для работы с такими моделями.

Почему-то нет примеров работы с ORM в задачах Airflow в терминах ловушек и операторов. В соответствии с использованием слоя базы данных Django вне Django? вопрос, необходимо настроить конфигурацию соединения с базой данных, а затем напрямую выполнить очереди в ORM, но выполнение этого вне соответствующих перехватчиков / операторов нарушает принципы Airflow. Это похоже на вызов BashOperator с командой "python work_with_django_models.py".

Наконец, мы хотим это:

Итак, каковы лучшие практики в этом случае? Разделяем ли мы какие-либо хуки / операторы для Django ORM / других ORM? Для того, чтобы следующий код был реальным (рассматривается как псевдокод!):

import os
import django
os.environ.setdefault(
    "DJANGO_SETTINGS_MODULE",
    "myapp.settings"
)
django.setup()
from your_app import models

def get_and_modify_models(ds, **kwargs):
    all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
    all_objects[15].my_int_field = 25
    all_objects[15].save()
    return list(all_objects)

django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')

вместо реализации этой функциональности в сыром SQL.

Я думаю, что это довольно важная тема, так как в этом случае целая часть основанных на ORM фреймворков и процессов не может погрузиться в Airflow.

Заранее спасибо!

1 ответ

Решение

Я согласен, что мы должны продолжать эту дискуссию, поскольку доступ к Django ORM может значительно снизить сложность решений.

Мой подход был 1) создать DjangoOperator

import os, sys

from airflow.models import BaseOperator


def setup_django_for_airflow():
    # Add Django project root to path
    sys.path.append('./project_root/')

    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

    import django
    django.setup()


class DjangoOperator(BaseOperator):

    def pre_execute(self, *args, **kwargs):
        setup_django_for_airflow()

и 2) Расширить этот DjangoOperator для логики / операторов, что выиграет от доступа к ORM

from .base import DjangoOperator


class DjangoExampleOperator(DjangoOperator):

    def execute(self, context):
        from myApp.models import model
        model.objects.get_or_create()

С помощью этой стратегии вы сможете различать операторы, использующие Raw SQL / ORM. Также обратите внимание, что для оператора Django все импорты моделей django должны находиться в контексте выполнения, как показано выше.

Другие вопросы по тегам