Создание нескольких задач в Airflow DAG для индивидуальной обработки
В моей DAG есть части, которые генерируют списки, которые я не могу разбить на отдельные задачи, которые будут обрабатываться по отдельности.
Вот псевдо-пример:
def push(**kwargs):
# """Pushes an XCom without a specific target"""
for n in range(10):
kwargs['ti'].xcom_push(key=f'vals', value=n)
def puller(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')
print(v1)
push = python_operator.PythonOperator(
task_id='push',
python_callable=push,
provide_context=True
)
puller = python_operator.PythonOperator(
task_id='puller',
python_callable=puller,
provide_context=True
)
Похоже, что xcom_push использует только последнее значение, а не генерирует список. Поэтому мне пришлось бы загружать значения в push в список, а затем использовать цикл for в pull для обработки каждого элемента в отдельности.
Я прекрасно справляюсь с этим, но это кажется нелогичным при выполнении пакетных заданий.
Как бы мне заставить пуллер тянуть одно из 10 заданий, созданных нажатием?
1 ответ
Между запусками DAG вы не должны изменять структуру DAG, поэтому ваш съемник - это либо одна задача, предназначенная для получения всех значений, либо 10 задач, каждая из которых предназначена для получения одного из значений.
Вот как вы бы добавили все 10 значений с помощью xcom:
def push(**kwargs):
# """Pushes an XCom without a specific target"""
final_output = []
for n in range(10):
# doing work
final_output.append(n)
kwargs['ti'].xcom_push(key=f'vals', value=final_output)
push = python_operator.PythonOperator(
task_id='push',
python_callable=push,
provide_context=True
)
И тогда вы можете вытащить все 10 из них, как это
def puller(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')
print(v1) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
puller = python_operator.PythonOperator(
task_id='puller',
python_callable=puller,
provide_context=True
)
Или одно значение для каждой из десяти задач:
def puller(index=0, **kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')[index]
print(v1)
ten_ops = [python_operator.PythonOperator(
task_id=f'puller_{n}',
python_callable=puller,
provide_context=True,
op_kwargs={'index': n},
) for n in range(10)]
Я надеюсь, что это поможет, если я не понял вопрос.