Хотите использовать TriggerDagRunOperator в Airflow, чтобы запускать множество вложенных тегов, используя только Main-dag с bashoperator (оператор sub-dag)

Невозможно понять концепцию полезной нагрузки в воздушном потоке с помощью TriggerDagRunOperator. Пожалуйста, помогите мне понять этот термин очень простым способом.

2 ответа

Надеюсь, это поможет, у меня тоже были проблемы с настройкой динамической полезной нагрузки:

      num_runs = 3
runs = [str(uuid.uuid4()) for _ in range(num_runs)]

run_dags = TriggerDagRunOperator.partial(
    task_id='test_07_few_opt_ins_triggered_dag',
    trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
    trigger_run_id=runs,
    conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)

Выше у нас есть 3 прогона, и нам нужно установитьexpandзаполнение конфа таким же количеством «прогонов».

Затем в сработавшем DAG:

      @task
def start(dag_run=None):
    print(f"consuming line {dag_run.conf.get('line')}")

start()

The TriggerDagRunOperator triggers a DAG run for a specified dag_id. This needs a trigger_dag_id with type string and a python_callable param which is a reference to a python function that will be called while passing it the context object and a placeholder object obj for your callable to fill and return if you want a DagRun created. This obj object contains a run_id and payload attribute that you can modify in your function.

The run_id should be a unique identifier for that DAG run, and the payload has to be a picklable object that will be made available to your tasks while executing that DAG run. Your function header should look like def foo(context, dag_run_obj):

picklable simply means it can be serialized by the pickle module. For a basic understanding of this, see what can be pickled and unpickled?. The pickle protocol provides more details, and shows how classes can customize the process.

Reference: https://github.com/apache/airflow/blob/d313d8d24b1969be9154b555dd91466a2489e1c7/airflow/operators/dagrun_operator.py

Другие вопросы по тегам