Я не могу xcom_push аргументы через BashOperator
Я новичок в Xcom функции Airflow. я попробовал это с PythonOperator, и он работал нормально (то есть я могу выдвинуть и извлечь значение из контекста), но когда я попробовал это на BashOperator, это не сработало. Однако я могу получить только последний оператор stdout, добавив атрибут xcom_push=True во время создания задачи. это одна вещь. 2) Но я также хотел бы выдвигать и извлекать значения, основанные на их ключах (в BashOp и обратно), как мы это делаем в PythonOp. Это было бы очень полезно, так как мне нужно передать тонны переменных из одного скрипта в другой.
3 ответа
Это то, что вы хотите?
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
dag_id="example_bash_operator_1",
schedule_interval=None,
start_date=datetime(2018, 12, 31),
)
t1 = BashOperator(
task_id="t1",
bash_command='echo "{{ ti.xcom_push(key="k1", value="v1") }}" "{{ti.xcom_push(key="k2", value="v2") }}"',
dag=dag,
)
t2 = BashOperator(
task_id="t2",
bash_command='echo "{{ ti.xcom_pull(key="k1") }}" "{{ ti.xcom_pull(key="k2") }}"',
dag=dag,
)
t1 >> t2
@SpaceyBot и Лукас ответили на ваш первый вопрос. По поводу второго поднятого вопроса
Цитата
2) Но я также хочу нажимать и извлекать значения на основе их ключей (в и из BashOp), как мы это делаем в PythonOp. Это было бы действительно полезно, поскольку мне нужно передать тонны переменных из одного скрипта в еще один.
Цитата
- это не рекомендуется. Все действия XCom pull/push транслируются в операторы Insert/Select в базе данных воздушного потока. Это снизит производительность планировщика со временем и замедлит всю обработку либо из-за большого количества запускаемых запросов (запросов), либо из-за большого количества извлекаемых строк, которые будут извлечены посредством сканирования полной таблицы вместо сканирования на основе индекса.
Поэтому здесь лучше рассмотреть другой механизм - хранение информации во внешних файлах json/csv/txt /.. и т. Д.
Итог - XCom предназначен для передачи только небольших объемов данных, в основном счетчиков и переменных состояния.
В дополнение к ответу @Ryan Yuan вы можете использовать параметр
env
принадлежащий
BashOperator
чтобы установить переменные среды для вашего скрипта/команды bash.
my_task = BashOperator(
task_id='my_task',
bash_command='echo $VAR1 $VAR2',
env={
"VAR1": '{{ ti.xcom_pull(key="var1")}}',
"VAR2": '{{ ti.xcom_pull(key="var2")}}'
},
dag=dag
)