Как получить значение из Airflow XCom, отправленного через SSHExecuteOperator
У меня есть следующая группа доступности базы данных с двумя задачами SSHExecuteOperator. Первая задача выполняет хранимую процедуру, которая возвращает параметр. Вторая задача нуждается в этом параметре в качестве входных данных.
Не могли бы вы объяснить, как извлечь значение из XCom, введенного в task1, чтобы использовать его в task2?
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['my@email.com'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin
DataQualitySSHHook = Variable.get('DataQualitySSHHook')
print('Connecting to: ' + DataQualitySSHHook)
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook)
sshHookEtl.no_host_key_check = True
#create dag
dag = DAG(
'ed_data_quality_test-v0.0.3', #update version whenever you change something
default_args=default_args,
schedule_interval="0 0 * * *",
dagrun_timeout=timedelta(hours=24),
max_active_runs=1)
#create tasks
task1 = SSHExecuteOperator(
task_id='run_remote_sp_audit_batch_register',
bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end
ssh_hook=sshHookEtl,
xcom_push=True,
retries=0,
dag=dag)
task2 = SSHExecuteOperator(
task_id='run_remote_sp_audit_module_session_start',
bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}",
ssh_hook=sshHookEtl,
retries=0,
dag=dag)
#create dependencies
task1.set_downstream(task2)
1 ответ
Таким образом, решение, которое я нашел, заключается в том, что когда task1 выполняет сценарий оболочки, вы должны убедиться, что параметр, который вы хотите захватить переменной XCom, является последним, что напечатано вашим сценарием (с использованием echo).
Затем я смог получить значение переменной XCom с помощью следующего фрагмента кода:
{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}
insted of xcom_push=True, попробуйте do_xcom_push=True, он перенесет весь стандартный вывод в xcom с ключом return_value