Выполнить SSH-соединение внутри python_callable Airflow?

В Airflow 1.10.10 DAG у нас есть ShortCircuitOperator который использует функцию Python check_remote_server() решить ветку.

В check_remote_server_data() функция, как мы можем запустить SSH-соединение с удаленным сервером, запустить на нем команду bash и получить результаты?

Можно ли использовать соединение Airflow SSH, которое я ранее определил с помощью веб-интерфейса?

def check_remote_server_data():
    pass   # how can we use a predefined Airflow SSH connection, named `remote`?

with dag:
    shortcircuitop = ShortCircuitOperator(
        task_id='shortcircuitop',
        python_callable=check_remote_server_data,
        dag=dag
    )

Я могу сделать это только с помощью SSHOperator, но мне нужны результаты для определения условий короткого замыкания:

SSHOperator(
    task_id='sshop',
    ssh_conn_id='remote',
    command='date +%F',
dag=dag)

1 ответ

Решение

В SSHOperator поддерживает аргумент, do_xcom_push для передачи вывода команды в виде значения XCom, к которому вы можете получить доступ в своемShortCircuitOperator:

def check_remote_server_data(**context):
    xcom_stdout = context["task_instance"].xcom_pull(task_ids="ssh_task_id")

with dag:
    ssh_operator = SSHOperator(task_id="ssh_task_id", do_xcom_push=True, ...)

    shortcircuitop = ShortCircuitOperator(
        task_id='shortcircuitop',
        python_callable=check_remote_server_data,
        provide_context=True,
        dag=dag
    )

    ssh_operator >> shortcircuitop >> [other_task0, other_task1, ...]
Другие вопросы по тегам