Airflow - Как передать переменную xcom в функцию Python

Мне нужно сослаться на переменную, которая возвращается BashOperator, Возможно, я делаю это неправильно, поэтому, пожалуйста, прости меня. В моем task_archive_s3_fileМне нужно получить имя файла от get_s3_file, Задача просто печатает {{ ti.xcom_pull(task_ids=submit_file_to_spark) }} в виде строки вместо значения.

Если я использую bash_command, значение печатается правильно.

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file

6 ответов

Решение

Шаблоны как {{ ti.xcom_pull(...) }} может использоваться только внутри параметров, которые поддерживают шаблоны, или они не будут отображаться до выполнения. Увидеть template_fields а также template_ext атрибуты PythonOperator и BashOperator.

Так templates_dict это то, что вы используете для передачи шаблонов вашему оператору Python:

def func_archive_s3_file(**context):
    archive(context['templates_dict']['s3_path_filename'])

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,  # must pass this because templates_dict gets passed via context
    templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })

Однако в случае извлечения значения XCom, другой альтернативой является просто использование TaskInstance объект доступен для вас через контекст:

def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,

Поднял вопрос и ответ, но я думаю, что это можно сделать немного более понятным для тех пользователей, которые просто хотят передавать небольшие объекты данных между PythonOperator задачи в своих DAG. Ссылка на этот вопрос и этот пример XCom привела меня к следующему решению. Супер просто:

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='example_dag',
  start_date=datetime.now(),
  schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a', 'b', 'c']
    return ls

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

Я не уверен, почему это работает, но это работает. Несколько вопросов для сообщества:

  • Что происходит с ti Вот? Как это встроено в **kwargs?
  • Является provide_context=True необходимо для обеих функций?

Любые изменения, чтобы сделать этот ответ более ясным, очень приветствуются!

Вот еще одно простое решение. Это модифицированная версия примера @Aaron, но я использовал пару ключ-значение при общении:

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='simple_xcom',
  start_date=datetime(2017, 10, 26),
  schedule_interval=timedelta(1)
)

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

В Airflow 2.0 (выпущенном в декабре 2020 г.) API TaskFlow упростил прохождение XComs. С помощью этого API вы можете просто возвращать значения из функций, аннотированных с помощью @task, и они будут передаваться как XCom за кулисами. Пример из учебника:

          @task()
    def extract():
        ...
        return order_data_dict
    
    @task()
    def transform(order_data_dict: dict):
        ...
        return total_order_value

    order_data = extract()
    order_summary = transform(order_data)

В этом примере имеет тип XComArg. Он хранит словарь, возвращенный extractзадача. Когда transformзадание выполняется, order_dataразворачивается, и задача получает простой объект Python, который был сохранен.

Если вы хотите передать xcom оператору bash в airflow 2, используйте ; скажем, вы нажали на xcom my_xcom_var, тогда вы можете использовать jinja внутри envчтобы вытащить значение xcom, например

      BashOperator(
    task_id=mytask,
    bash_command="echo ${MYVAR}",
    env={"MYVAR": '{{ ti.xcom_pull(key=\'my_xcom_var\') }}'},
    dag=dag
)

Проверьте https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html#module-airflow.operators.bash для получения более подробной информации.

BaseOperator Airflow определяет свойствоoutputкоторые вы можете использовать для доступа кxcomсодержание данного оператора. Вот конкретный пример

      with DAG(...):
    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=lambda: 'Hello, World!')

    pull_task = PythonOperator(
        task_id='pull_task', 
        python_callable=lambda x: print(x),
        op_args=[push_task.output])

что должно быть почти эквивалентно

      with DAG(...):
    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=lambda: 'Hello, World!')

    pull_task = PythonOperator(
        task_id='pull_task', 
        python_callable=lambda x: print(x),
        op_args=["{{ task_instance.xcom_pull('push_task') }}"])

Насколько я знаю, единственная разница в том, что первый неявно определяетpush_task >> pull_task.

Другие вопросы по тегам