Воздушный поток: получить идентификатор предыдущей задачи в следующей задаче
У меня есть 2 задачи. В первом случае оператор python что-то вычисляет, а во втором я хочу использовать вывод оператора python в операторе Http. Вот мой код:
source_list = ['account', 'sales']
for source_type in source_list:
t2 = PythonOperator(
task_id='compute_next_gather_time_for_' + source_type,
python_callable=compute_next_gather_time,
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
op_args=[source_type],
retries=3
)
t3 = SimpleHttpOperator(
task_id='request_' + source_type + '_report',
method='POST',
http_conn_id='abc',
endpoint=endpoint,
data=json.dumps({
"query": {
"start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
"stop": str(yesterday),
"fields": [
1
]
}
}),
headers={"Content-Type": "application/json", "Authorization": 'abc'},
response_check=lambda response: True if len(response.json()) == 0 else False,
log_response=True,
retries=3
)
Запрос: я хочу передать предыдущий идентификатор задачи в t3 в ее переменную данных. Я не уверен, как это сделать, поскольку идентификатор задачи t2 не является постоянным. Это меняется с изменением source_type. Видимо, когда я пытался это не сделать это.
3 ответа
Я смог получить это, выполнив следующие действия:
next(iter(context['task'].upstream_task_ids))
Раньше я не использовал шаблоны Jinja ни в одном из своих DAG, но столкнулся с похожими проблемами, когда мне нужно было извлечь значения XCOM из конкретной задачи, которая имеет динамически генерируемый task_id.
Вы можете определить task_ids
в Т3 так же, как вы определили task_id
в Т2. Например:
source_list = ['account', 'sales']
for source_type in source_list:
task_id='compute_next_gather_time_for_' + source_type
t2 = PythonOperator(
task_id=task_id,
python_callable=compute_next_gather_time,
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
op_args=[source_type],
retries=3
)
t3 = SimpleHttpOperator(
task_id='request_' + source_type + '_report',
method='POST',
http_conn_id='abc',
endpoint=endpoint,
data=json.dumps({
"query": {
"start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
"stop": str(yesterday),
"fields": [
1
]
}
}),
headers={"Content-Type": "application/json", "Authorization": 'abc'},
response_check=lambda response: True if len(response.json()) == 0 else False,
log_response=True,
retries=3
)
Чтобы немного уточнить ответ @cosbor11.
В этом подходе используется объект Airflow, извлеченный из аргументов ключевых слов, предоставленных Airflow во время выполнения DAG. Их когда-то называлиcontext
и был спорPythonOperator
provide_context
, но я считаю, что сейчас это устарело. Контекст всегда предоставляется сейчас, делая доступными ,task-instance
и другие объекты и атрибуты.
Итак, мы можем тянуть такие вещи, какupstream_task_ids
изtask
объект. Используйте итератор или просто получите доступ к нему как к списку.
def my_python_callable(**context):
upstream_id = next(iter(context['task'].upstream_task_ids))
upstream_ids = context['task'].upstream_task_ids
print(f"got upstream task_id from the task object in the Airflow-provided context: {upstream_id} from a list: {upstream_ids}")
with DAG('silly_hats') as dag:
task0 = DummyOperator(task_id = 'my_spoons_too_big')
task1 = PythonOperator(task_id = 'i_am_a_banana', python_callable = my_python_callable, dag = dag)