Хотите использовать TriggerDagRunOperator в Airflow, чтобы запускать множество вложенных тегов, используя только Main-dag с bashoperator (оператор sub-dag)
Невозможно понять концепцию полезной нагрузки в воздушном потоке с помощью TriggerDagRunOperator. Пожалуйста, помогите мне понять этот термин очень простым способом.
2 ответа
Надеюсь, это поможет, у меня тоже были проблемы с настройкой динамической полезной нагрузки:
num_runs = 3
runs = [str(uuid.uuid4()) for _ in range(num_runs)]
run_dags = TriggerDagRunOperator.partial(
task_id='test_07_few_opt_ins_triggered_dag',
trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
trigger_run_id=runs,
conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)
Выше у нас есть 3 прогона, и нам нужно установитьexpand
заполнение конфа таким же количеством «прогонов».
Затем в сработавшем DAG:
@task
def start(dag_run=None):
print(f"consuming line {dag_run.conf.get('line')}")
start()
The TriggerDagRunOperator
triggers a DAG run for a specified dag_id
. This needs a trigger_dag_id
with type string
and a python_callable param which is a reference to a python function that will be called while passing it the context
object and a placeholder object obj
for your callable to fill and return if you want a DagRun created. This obj
object contains a run_id
and payload
attribute that you can modify in your function.
The run_id
should be a unique identifier for that DAG run, and the payload has to be a picklable object that will be made available to your tasks while executing that DAG run. Your function header should look like def foo(context, dag_run_obj):
picklable simply means it can be serialized by the pickle module. For a basic understanding of this, see what can be pickled and unpickled?. The pickle protocol provides more details, and shows how classes can customize the process.