Создание нескольких задач в Airflow DAG для индивидуальной обработки

В моей DAG есть части, которые генерируют списки, которые я не могу разбить на отдельные задачи, которые будут обрабатываться по отдельности.

Вот псевдо-пример:

def push(**kwargs):
    # """Pushes an XCom without a specific target"""
    for n in range(10):
        kwargs['ti'].xcom_push(key=f'vals', value=n)

def puller(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')
    print(v1)

push = python_operator.PythonOperator(
    task_id='push',
    python_callable=push,
    provide_context=True
)

puller = python_operator.PythonOperator(
    task_id='puller',
    python_callable=puller,
    provide_context=True
)

Похоже, что xcom_push использует только последнее значение, а не генерирует список. Поэтому мне пришлось бы загружать значения в push в список, а затем использовать цикл for в pull для обработки каждого элемента в отдельности.

Я прекрасно справляюсь с этим, но это кажется нелогичным при выполнении пакетных заданий.

Как бы мне заставить пуллер тянуть одно из 10 заданий, созданных нажатием?

1 ответ

Между запусками DAG вы не должны изменять структуру DAG, поэтому ваш съемник - это либо одна задача, предназначенная для получения всех значений, либо 10 задач, каждая из которых предназначена для получения одного из значений.

Вот как вы бы добавили все 10 значений с помощью xcom:

def push(**kwargs):
    # """Pushes an XCom without a specific target"""
    final_output = []
    for n in range(10):
        # doing work
        final_output.append(n)
    kwargs['ti'].xcom_push(key=f'vals', value=final_output)

push = python_operator.PythonOperator(
    task_id='push',
    python_callable=push,
    provide_context=True
)

И тогда вы можете вытащить все 10 из них, как это

def puller(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')
    print(v1)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

puller = python_operator.PythonOperator(
    task_id='puller',
    python_callable=puller,
    provide_context=True
)

Или одно значение для каждой из десяти задач:

def puller(index=0, **kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')[index]
    print(v1)

ten_ops = [python_operator.PythonOperator(
        task_id=f'puller_{n}',
        python_callable=puller,
        provide_context=True,
        op_kwargs={'index': n},
    ) for n in range(10)]

Я надеюсь, что это поможет, если я не понял вопрос.

Другие вопросы по тегам