Я не могу xcom_push аргументы через BashOperator

Я новичок в Xcom функции Airflow. я попробовал это с PythonOperator, и он работал нормально (то есть я могу выдвинуть и извлечь значение из контекста), но когда я попробовал это на BashOperator, это не сработало. Однако я могу получить только последний оператор stdout, добавив атрибут xcom_push=True во время создания задачи. это одна вещь. 2) Но я также хотел бы выдвигать и извлекать значения, основанные на их ключах (в BashOp и обратно), как мы это делаем в PythonOp. Это было бы очень полезно, так как мне нужно передать тонны переменных из одного скрипта в другой.

3 ответа

Решение

Это то, что вы хотите?

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    dag_id="example_bash_operator_1",
    schedule_interval=None,
    start_date=datetime(2018, 12, 31),
)

t1 = BashOperator(
    task_id="t1",
    bash_command='echo "{{ ti.xcom_push(key="k1", value="v1") }}" "{{ti.xcom_push(key="k2", value="v2") }}"',
    dag=dag,
)

t2 = BashOperator(
    task_id="t2",
    bash_command='echo "{{ ti.xcom_pull(key="k1") }}" "{{ ti.xcom_pull(key="k2") }}"',
    dag=dag,
)

t1 >> t2

@SpaceyBot и Лукас ответили на ваш первый вопрос. По поводу второго поднятого вопроса

Цитата

2) Но я также хочу нажимать и извлекать значения на основе их ключей (в и из BashOp), как мы это делаем в PythonOp. Это было бы действительно полезно, поскольку мне нужно передать тонны переменных из одного скрипта в еще один.

Цитата

  • это не рекомендуется. Все действия XCom pull/push транслируются в операторы Insert/Select в базе данных воздушного потока. Это снизит производительность планировщика со временем и замедлит всю обработку либо из-за большого количества запускаемых запросов (запросов), либо из-за большого количества извлекаемых строк, которые будут извлечены посредством сканирования полной таблицы вместо сканирования на основе индекса.

Поэтому здесь лучше рассмотреть другой механизм - хранение информации во внешних файлах json/csv/txt /.. и т. Д.

Итог - XCom предназначен для передачи только небольших объемов данных, в основном счетчиков и переменных состояния.

В дополнение к ответу @Ryan Yuan вы можете использовать параметр envпринадлежащий BashOperatorчтобы установить переменные среды для вашего скрипта/команды bash.

      my_task = BashOperator(
    task_id='my_task',
    bash_command='echo $VAR1 $VAR2',
    env={
        "VAR1": '{{ ti.xcom_pull(key="var1")}}',
        "VAR2": '{{ ti.xcom_pull(key="var2")}}'
    },
    dag=dag
)

Другие вопросы по тегам