Воздушный поток: получить идентификатор предыдущей задачи в следующей задаче

У меня есть 2 задачи. В первом случае оператор python что-то вычисляет, а во втором я хочу использовать вывод оператора python в операторе Http. Вот мой код:

source_list = ['account', 'sales']

for source_type in source_list:
    t2 = PythonOperator(
                task_id='compute_next_gather_time_for_' + source_type,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )

Запрос: я хочу передать предыдущий идентификатор задачи в t3 в ее переменную данных. Я не уверен, как это сделать, поскольку идентификатор задачи t2 не является постоянным. Это меняется с изменением source_type. Видимо, когда я пытался это не сделать это.

3 ответа

Я смог получить это, выполнив следующие действия:

      next(iter(context['task'].upstream_task_ids))

Раньше я не использовал шаблоны Jinja ни в одном из своих DAG, но столкнулся с похожими проблемами, когда мне нужно было извлечь значения XCOM из конкретной задачи, которая имеет динамически генерируемый task_id.

Вы можете определить task_ids в Т3 так же, как вы определили task_id в Т2. Например:

source_list = ['account', 'sales']

for source_type in source_list:

    task_id='compute_next_gather_time_for_' + source_type

    t2 = PythonOperator(
                task_id=task_id,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )

Чтобы немного уточнить ответ @cosbor11.

В этом подходе используется объект Airflow, извлеченный из аргументов ключевых слов, предоставленных Airflow во время выполнения DAG. Их когда-то называлиcontextи был спорPythonOperator provide_context, но я считаю, что сейчас это устарело. Контекст всегда предоставляется сейчас, делая доступными ,task-instanceи другие объекты и атрибуты.

Итак, мы можем тянуть такие вещи, какupstream_task_idsизtaskобъект. Используйте итератор или просто получите доступ к нему как к списку.

      def my_python_callable(**context):
    upstream_id = next(iter(context['task'].upstream_task_ids))
    upstream_ids = context['task'].upstream_task_ids
    print(f"got upstream task_id from the task object in the Airflow-provided context: {upstream_id} from a list: {upstream_ids}")

with DAG('silly_hats') as dag:
    task0 = DummyOperator(task_id = 'my_spoons_too_big')
    task1 = PythonOperator(task_id = 'i_am_a_banana', python_callable = my_python_callable, dag = dag)
Другие вопросы по тегам