Создавайте динамические рабочие процессы в Airflow со значением XCOM

Теперь я создаю несколько задач, используя такую ​​переменную, и она отлично работает.

      with DAG(....) as dag:
    body = Variable.get("config_table", deserialize_json=True)
    for i in range(len(body.keys())):
        simple_task = Operator(
            task_id = 'task_' + str(i),
            .....

Но мне почему-то нужно использовать значение XCOM вместо использования переменной. Можно ли динамически создавать задачи с использованием значения XCOM pull?

Я пытаюсь установить такое значение, но оно не работает

body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"

2 ответа

Решение

Это невозможно, и в целом динамические задачи не рекомендуются:

  1. Планировщик Airflow работает следующим образом: читает файл dag, загружает задачи в память, а затем проверяет, какие dag и какие задачи нужно запланировать, в то время как xcom - это значения времени выполнения, которые связаны с конкретным запуском dag, поэтому планировщик не может передавать значения xcom.
  2. При использовании динамических задач вы значительно усложняете отладку для себя, поскольку значения, которые вы используете для создания dag, могут измениться, и вы потеряете доступ к журналам, даже не понимая почему.

Что вы можете сделать, так это использовать оператор ветвления, чтобы эти задачи всегда выполнялись, и просто пропускать их в зависимости от значения xcom. Например:

      def branch_func(**context)
    return f"task_{context['ti'].xcom_pull(key=key)}"


branch = BranchPythonOperator(
    task_id="branch",
    python_callback=branch_func
)

tasks = [BaseOperator(task_id=f"task_{i}") for i in range(3)]
branch >> tasks

В некоторых случаях также нецелесообразно использовать этот метод (например, когда у меня 100 возможных задач), в этих случаях я бы рекомендовал написать свой собственный оператор или использовать один PythonOperator.

Можно динамически создавать задачи из XComsсозданный из предыдущей задачи, есть более обширные обсуждения по этой теме, например, в этом вопросе . Один из предлагаемых подходов следует этой структуре, вот рабочий пример, который я сделал:

sample_file.json:

      {
    "cities": [ "London", "Paris", "BA", "NY" ]
}
  • Получите данные из API, файла или любого источника. Пуши как.
      
def _process_obtained_data(ti):
    list_of_cities = ti.xcom_pull(task_ids='get_data')
    Variable.set(key='list_of_cities',
                 value=list_of_cities['cities'], serialize_json=True)

def _read_file():
    with open('dags/sample_file.json') as f:
        data = json.load(f)
        # push to XCom using return
        return data


with DAG('dynamic_tasks_example', schedule_interval='@once',
         start_date=days_ago(2),
         catchup=False) as dag:

    get_data = PythonOperator(
        task_id='get_data',
        python_callable=_read_file)
  • Добавьте вторую задачу, которая будет извлекать из XCom и задайте для данных, которые вы будете использовать для итерации позже.
          preparation_task = PythonOperator(
        task_id='preparation_task',
        python_callable=_process_obtained_data)

* Конечно, при желании вы можете объединить обе задачи в одну. Я предпочитаю этого не делать, потому что обычно я беру подмножество полученных данных для создания файла.

  • Прочтите это, а затем повторите. Очень важно определить default_var.
          end = DummyOperator(
        task_id='end',
        trigger_rule='none_failed')

    # Top-level code within DAG block
    iterable_list = Variable.get('list_of_cities',
                                 default_var=['default_city'],
                                 deserialize_json=True)
  • Объявляйте динамические задачи и их зависимости в цикле. Сделать task_id уникальные. TaskGroup не является обязательным, помогает сортировать пользовательский интерфейс.
      
    with TaskGroup('dynamic_tasks_group',
                   prefix_group_id=False,
                   ) as dynamic_tasks_group:

        for index, city in enumerate(iterable_list):
            say_hello = PythonOperator(
                task_id=f'say_hello_from_{city}',
                python_callable=_print_greeting,
                op_kwargs={'city_name': city, 'greeting': 'Hello'}
            )
            say_goodbye = PythonOperator(
                task_id=f'say_goodbye_from_{city}',
                python_callable=_print_greeting,
                op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
            )

            # TaskGroup level dependencies
            say_hello >> say_goodbye

# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end

Просмотр графика DAG:

Импорт:

      import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

Что нужно иметь в виду:

  • Если у вас есть одновременные dag_runs этого же DAG, все они будут использовать одну и ту же переменную, поэтому вам может потребоваться сделать ее «уникальной», различая их имена.
  • Вы должны установить значение по умолчанию при чтении Variable, в противном случае первое выполнение может быть невозможно обработать Scheduler.
  • Пользовательский интерфейс представления графика воздушного потока может не обновлять изменения сразу. Это происходит особенно при первом запуске после добавления или удаления элементов из итерируемого объекта, на котором создается динамическая генерация задачи.
  • Если вам нужно читать из многих переменных, важно помнить, что рекомендуется хранить их в одном значении JSON, чтобы избежать постоянного создания подключений к базе данных метаданных (пример в этой статье ).

Удачи!

Другие вопросы по тегам