Воздушный поток, XCom и несколько task_ids

Как работает task_ids, если указано несколько задач?

В этом конкретном примере кода я ожидал получить load_cycle_id_2 от обеих задач в кортеже (5555,22222), но вместо этого он вышел (None, 22222).

Это почему?

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

args = {
    'owner': 'airflow',
    'start_date': datetime.now(),
    'provide_context': True
}

demo_dag = DAG(dag_id='first', start_date=datetime.now(), schedule_interval='@once',default_args=args)

def push_load_id(**kwargs):
    kwargs['ti'].xcom_push(key='load_cycle_id_2',value=22222)
    kwargs['ti'].xcom_push(key='load_cycle_id_3',value=44444)

def another_push_load_id(**kwargs):
    kwargs['ti'].xcom_push(key='load_cycle_id_2',value=5555)
    kwargs['ti'].xcom_push(key='anotherload_cycle_id_3',value=6666)

def pull_load_id(**kwargs):
    ti = kwargs['ti'].xcom_pull(key='load_cycle_id_2', task_ids=['another_push_load_id','push_load_id'])
    print(ti)

push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)

push_operator >> pull_operator

1 ответ

Решение

Твой Дагс работает только push_load_id а такжеpull_load_id функции. Вы не создаете оператора, который использует another_push_load_id функция.

Конец вашего кода должен выглядеть так:

push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
another_push_operator = PythonOperator(task_id='push_load_id', python_callable= another_push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)

push_operator >> another_push_operator >> pull_operator
Другие вопросы по тегам