Как извлечь значение xcom из другого экземпляра задачи в том же прогоне DAG (не самый последний)?
У меня 3 прогона DAG:
- DAGR 1 исполнено в 2019-02-13 16:00:00
- DAGR 2 исполнено в 2019-02-13 17:00:00
- DAGR 3 исполнено в 2019-02-13 18:00:00
В экземпляре задачи X
из DAGR 1
Я хочу получить значение xcom экземпляра задачи Y
, Я сделал это:
kwargs['task_instance'].xcom_pull(task_ids='Y')
Я ожидал получить значение xcom из экземпляра задачи Y
в DAGR 1
, Вместо этого я получил от DAGR 3
,
Из документации Airflow
Если
xcom_pull
передается одна строка дляtask_ids
затем возвращается самое последнее значение XCom из этой задачи; ...
- Почему воздушный поток
xcom_pull
вернуть самое последнее значение xcom? - Что делать, если я хочу вытащить из того же DAG запустить?
2 ответа
Это отвечает на ваш вопрос [Как извлечь значение xcom из другого экземпляра задачи в том же прогоне DAG (не самого последнего)? ]
Смотрите пример ниже:
t1 = SomeOperator(
task_id='Your_t1_Task_ID',
...
...
dag=dag)
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='Your_t1_Task_ID')
string_to_print = 'Value in xcom is: {}'.format(xcom)
#string_to_print holds that value, you can also print it in the logs
logging.info(string_to_print)
t2 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_records,
dag=dag)
t1 >> t2
- Я думаю, что вы ищете
include_prior_dates
параметрxcom_pull()
метод - Обратите внимание, что он вернет всю историю
Xcom
с (python
list
каждый элемент одинxcom
запись) подталкивается даннымtask
(отфильтрованоtask_id
(s)), а затем вам придется вручную отфильтровать желаемыйxcom
с помощьюexecution_date
поле - Это может быть трудно точно поставить
execution_date
для фильтрации; для этого посмотрим, как они реализовалиexecution_delta
а такжеexecution_date_fn
параметры вExternalTaskSensor