Создание и использование подключений в операторе Airflow во время выполнения
Примечание: это не дубликат
- Экспорт переменных среды во время выполнения с потоком воздуха
- Задать переменные потока воздуха во время выполнения
Я должен вызвать определенные задачи на удаленных системах из моего Airflow
DAG
, Прямой способ достичь этого SSHHook
,
Проблема в том, что удаленная система является EMR
кластер, который сам по себе создается во время выполнения (задачей верхнего уровня) с помощью EmrCreateJobFlowOperator
, Так что пока я могу заполучить job_flow_id
запущенного кластера EMR ( используя XCOM
), что мне нужно, это ssh_conn_id
быть переданным каждой последующей задаче.
Глядя на документы и код, видно, что Airflow попытается найти это соединение (используя conn_id
) в переменных БД и окружения, так что теперь проблема сводится к возможности установить любое из этих двух свойств во время выполнения (изнутри operator
).
Это кажется довольно распространенной проблемой, потому что если это не достижимо, то полезность EmrCreateJobFlowOperator
будет сильно затруднен; но я не встречал ни одного примера, демонстрирующего это.
- Можно ли создать (а также уничтожить) любой из них внутри оператора воздушного потока?
- Соединение (сохраняется в БД Airflow)
- Переменная среды (должна быть доступна для всех последующих задач, а не только для текущей задачи, как указано здесь)
- Если нет, каковы мои варианты?
Я на
Airflow v1.10
Python 3.6.6
emr-5.15
(может обновить при необходимости)
2 ответа
Соединения приходят от ORM
Да, вы можете создавать соединения во время выполнения, даже во время создания DAG, если вы достаточно осторожны. Воздушный поток полностью прозрачен на внутренних моделях, поэтому вы можете напрямую взаимодействовать с базовым SqlAlchemy. Как видно из этого ответа, это так же просто, как:
from airflow.models import Connection
from airflow import settings
def create_conn(username, password, host=None):
new_conn = Connection(conn_id=f'{username}_connection',
login=username,
host=host if host else None)
new_conn.set_password(password)
session = settings.Session()
session.add(new_conn)
session.commit()
Конечно, вы можете взаимодействовать с любыми другими дополнительными свойствами Соединения, которые могут вам потребоваться для соединения EMR.
Окружающая среда ограничена процессом
Это не ограничение Airflow или Python, но (AFAIK для каждой крупной ОС) среды связаны с временем жизни процесса. Когда ты export
переменная в bash, например, вы просто утверждаете, что когда вы порождаете дочерние процессы, вы хотите скопировать эту переменную в дочернюю среду. Это означает, что родительский процесс не может изменить среду ребенка после своего создания, и ребенок не может изменить среду родителей.
Короче говоря, только сам процесс может изменить свою среду после его создания. И учитывая, что рабочий процесс является подпроцессом Airflow, также сложно контролировать создание их сред. Что вы можете сделать, это записать переменные среды в файл и преднамеренно обновлять текущую среду с помощью переопределений из этого файла при каждом запуске задачи.
Вы можете сделать это, создав задачу Airflow после EmrCreateJobFlowOperator
, который использует BashOperator, чтобы, вероятно, использовать aws-cli для получения IP-адреса виртуальной машины, на которой вы хотите запустить задачу, и в той же задаче запустить airflow cli, который создает SSH-соединение с использованием этого IP-адреса.