Как извлечь значение xcom из другого экземпляра задачи в том же прогоне DAG (не самый последний)?

У меня 3 прогона DAG:

  1. DAGR 1 исполнено в 2019-02-13 16:00:00
  2. DAGR 2 исполнено в 2019-02-13 17:00:00
  3. DAGR 3 исполнено в 2019-02-13 18:00:00

В экземпляре задачи X из DAGR 1 Я хочу получить значение xcom экземпляра задачи Y, Я сделал это:

kwargs['task_instance'].xcom_pull(task_ids='Y')

Я ожидал получить значение xcom из экземпляра задачи Y в DAGR 1, Вместо этого я получил от DAGR 3,

Из документации Airflow

Если xcom_pull передается одна строка для task_idsзатем возвращается самое последнее значение XCom из этой задачи; ...

  1. Почему воздушный поток xcom_pull вернуть самое последнее значение xcom?
  2. Что делать, если я хочу вытащить из того же DAG запустить?

2 ответа

Это отвечает на ваш вопрос [Как извлечь значение xcom из другого экземпляра задачи в том же прогоне DAG (не самого последнего)? ]
Смотрите пример ниже:

t1 = SomeOperator(
        task_id='Your_t1_Task_ID',
        ...
        ...
        dag=dag)

    def get_records(**kwargs):
        ti = kwargs['ti']
        xcom = ti.xcom_pull(task_ids='Your_t1_Task_ID')
        string_to_print = 'Value in xcom is: {}'.format(xcom)
        #string_to_print holds that value, you can also print it in the logs
        logging.info(string_to_print)

    t2 = PythonOperator(
        task_id='records',
        provide_context=True,
        python_callable=get_records,
        dag=dag)

    t1 >> t2
  • Я думаю, что вы ищете include_prior_dates параметр xcom_pull() метод
  • Обратите внимание, что он вернет всю историю Xcom с (pythonlist каждый элемент один xcom запись) подталкивается данным task (отфильтровано task_id (s)), а затем вам придется вручную отфильтровать желаемый xcom с помощью execution_date поле
  • Это может быть трудно точно поставить execution_date для фильтрации; для этого посмотрим, как они реализовали execution_delta а также execution_date_fn параметры в ExternalTaskSensor
Другие вопросы по тегам