Очистить задание вверх по течению в потоке воздуха
У меня есть задача в DAG воздушного потока. у него три дочерних задания. к сожалению, есть случаи, когда это родительское задание будет выполнено успешно, но два из трех дочерних элементов потерпят неудачу (и повторная попытка дочерних элементов не исправит их).
это требует, чтобы родитель повторил попытку (даже при том, что это не терпело неудачу).
поэтому я покорно захожу в графическое представление прогона dag и "очищаю" эту родительскую задачу и все последующие задачи (+ рекурсивные).
Есть ли способ, которым я могу сделать это в самом DAG?
3 ответа
Если ваши задачи являются частью подзадачи, вызов dag.clear()
в on_retry_callback
из SubDagOperator
должен сделать трюк:
SubDagOperator(
subdag=subdag,
task_id="...",
on_retry_callback=lambda context: subdag.clear(
start_date=context['execution_date'],
end_date=context['execution_date']),
dag=dag
)
Мы выбрали clear_task_instances
метод taskinstance
:
@provide_session
def clear_tasks_fn(tis,session=None,activate_dag_runs=False,dag=None) -> None:
"""
Wrapper for `clear_task_instances` to be used in callback function
(that accepts only `context`)
"""
taskinstance.clear_task_instances(tis=tis,
session=session,
activate_dag_runs=activate_dag_runs,
dag=dag)
def clear_tasks_callback(context) -> None:
"""
Clears tasks based on list passed as `task_ids_to_clear` parameter
To be used as `on_retry_callback`
"""
all_tasks = context["dag_run"].get_task_instances()
dag = context["dag"]
task_ids_to_clear = context["params"].get("task_ids_to_clear", [])
tasks_to_clear = [ ti for ti in all_tasks if ti.task_id in task_ids_to_clear ]
clear_tasks_fn(tasks_to_clear,
dag=dag)
Вам нужно будет предоставить список задач, которые вы хотите очистить при обратном вызове, например, для любой дочерней задачи:
DummyOperator('some_child',
on_retry_callback=clear_tasks_callback,
params=dict(
task_ids_to_clear=['some_child', 'parent']
),
retries=1
)
У нас была похожая проблема, которую мы решили, поместив задачу с зависимостями, которые мы хотим повторить, в подпространство. Затем при повторных попытках sub dag мы очищаем состояние задач sub dag, используя on_retry_callback
так что они все снова бегут.
sub_dag = SubDagOperator(
retry_delay=timedelta(seconds=30),
subdag=create_sub_dag(),
on_retry_callback=callback_subdag_clear,
task_id=sub_dag_name,
dag=dag,
)
def callback_subdag_clear(context):
"""Clears a sub-dag's tasks on retry."""
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id,
)
execution_date = context['execution_date']
sub_dag = DagBag().get_dag(dag_id)
sub_dag.clear(
start_date=execution_date,
end_date=execution_date,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=False
)
(первоначально взято отсюда https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117)
Он не дает прямого ответа на ваш вопрос, но я могу предложить лучший обходной путь:
default_args = {
'start_date': datetime(2017, 12, 16),
'depends_on_past': True,
}
dag = DAG(
dag_id='main_dag',
schedule_interval='@daily',
default_args=default_args,
max_active_runs=1,
retries=100,
retry_delay= timedelta(seconds=120)
)
Установить depends_on_past
Истинно в DAG.
Затем в задачах этого dag ограничьте количество повторных попыток, используя повторные попытки
DummyOperator(
task_id='bar',
retries=0
dag=child)
Таким образом, группа обеспечения доступности баз данных помечается как сбойная при сбое любой задачи. Затем DAG будет повторен.