Выполнить SSH-соединение внутри python_callable Airflow?
В Airflow 1.10.10 DAG у нас есть ShortCircuitOperator
который использует функцию Python check_remote_server()
решить ветку.
В check_remote_server_data()
функция, как мы можем запустить SSH-соединение с удаленным сервером, запустить на нем команду bash и получить результаты?
Можно ли использовать соединение Airflow SSH, которое я ранее определил с помощью веб-интерфейса?
def check_remote_server_data():
pass # how can we use a predefined Airflow SSH connection, named `remote`?
with dag:
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
dag=dag
)
Я могу сделать это только с помощью SSHOperator
, но мне нужны результаты для определения условий короткого замыкания:
SSHOperator(
task_id='sshop',
ssh_conn_id='remote',
command='date +%F',
dag=dag)
1 ответ
Решение
В SSHOperator
поддерживает аргумент, do_xcom_push
для передачи вывода команды в виде значения XCom, к которому вы можете получить доступ в своемShortCircuitOperator
:
def check_remote_server_data(**context):
xcom_stdout = context["task_instance"].xcom_pull(task_ids="ssh_task_id")
with dag:
ssh_operator = SSHOperator(task_id="ssh_task_id", do_xcom_push=True, ...)
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
provide_context=True,
dag=dag
)
ssh_operator >> shortcircuitop >> [other_task0, other_task1, ...]