Создание и использование подключений в операторе Airflow во время выполнения

Примечание: это не дубликат


Я должен вызвать определенные задачи на удаленных системах из моего AirflowDAG, Прямой способ достичь этого SSHHook,

Проблема в том, что удаленная система является EMR кластер, который сам по себе создается во время выполнения (задачей верхнего уровня) с помощью EmrCreateJobFlowOperator, Так что пока я могу заполучить job_flow_id запущенного кластера EMR ( используя XCOM), что мне нужно, это ssh_conn_id быть переданным каждой последующей задаче.


Глядя на документы и код, видно, что Airflow попытается найти это соединение (используя conn_id) в переменных БД и окружения, так что теперь проблема сводится к возможности установить любое из этих двух свойств во время выполнения (изнутри operator).

Это кажется довольно распространенной проблемой, потому что если это не достижимо, то полезность EmrCreateJobFlowOperator будет сильно затруднен; но я не встречал ни одного примера, демонстрирующего это.


  • Можно ли создать (а также уничтожить) любой из них внутри оператора воздушного потока?
    1. Соединение (сохраняется в БД Airflow)
    2. Переменная среды (должна быть доступна для всех последующих задач, а не только для текущей задачи, как указано здесь)
  • Если нет, каковы мои варианты?

Я на

  • Airflow v1.10
  • Python 3.6.6
  • emr-5.15 (может обновить при необходимости)

2 ответа

Решение

Соединения приходят от ORM

Да, вы можете создавать соединения во время выполнения, даже во время создания DAG, если вы достаточно осторожны. Воздушный поток полностью прозрачен на внутренних моделях, поэтому вы можете напрямую взаимодействовать с базовым SqlAlchemy. Как видно из этого ответа, это так же просто, как:

from airflow.models import Connection
from airflow import settings

def create_conn(username, password, host=None):
    new_conn = Connection(conn_id=f'{username}_connection',
                                  login=username,
                                  host=host if host else None)
    new_conn.set_password(password)

    session = settings.Session()
    session.add(new_conn)
    session.commit()

Конечно, вы можете взаимодействовать с любыми другими дополнительными свойствами Соединения, которые могут вам потребоваться для соединения EMR.

Окружающая среда ограничена процессом

Это не ограничение Airflow или Python, но (AFAIK для каждой крупной ОС) среды связаны с временем жизни процесса. Когда ты export переменная в bash, например, вы просто утверждаете, что когда вы порождаете дочерние процессы, вы хотите скопировать эту переменную в дочернюю среду. Это означает, что родительский процесс не может изменить среду ребенка после своего создания, и ребенок не может изменить среду родителей.

Короче говоря, только сам процесс может изменить свою среду после его создания. И учитывая, что рабочий процесс является подпроцессом Airflow, также сложно контролировать создание их сред. Что вы можете сделать, это записать переменные среды в файл и преднамеренно обновлять текущую среду с помощью переопределений из этого файла при каждом запуске задачи.

Вы можете сделать это, создав задачу Airflow после EmrCreateJobFlowOperator, который использует BashOperator, чтобы, вероятно, использовать aws-cli для получения IP-адреса виртуальной машины, на которой вы хотите запустить задачу, и в той же задаче запустить airflow cli, который создает SSH-соединение с использованием этого IP-адреса.

Другие вопросы по тегам