Состояние задачи "Воздушный поток" в пределах метки
Мне нужен статус задания, например, если оно выполняется, или вы неуспешно пытаетесь выполнить его в течение того же дня. Поэтому я попытался получить его, используя приведенный ниже код, хотя я не получил вывод...
Auto = PythonOperator(
task_id='test_sleep',
python_callable=execute_on_emr,
op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'},
dag=dag)
logger.info(Auto)
Намерение состоит в том, чтобы убить определенные выполняющиеся задачи после выполнения конкретной задачи в потоке воздуха.
Вопрос в том, как мне получить состояние задачи, например, в рабочем состоянии, сбой или успех?
4 ответа
Я делаю что-то подобное. Мне нужно проверить одну задачу, если предыдущие 10 запусков другой задачи были успешными. taky2 отправил меня на правильный путь. На самом деле это довольно просто:
from airflow.models import TaskInstance
ti = TaskInstance(*your_task*, execution_date)
state = ti.current_state()
Поскольку я хочу проверить, что в пределах метки, нет необходимости указывать метку. Я просто создал функцию, чтобы перебрать прошлые n_days и проверить статус.
def check_status(**kwargs):
last_n_days = 10
for n in range(0,last_n_days):
date = kwargs['execution_date']- timedelta(n)
ti = TaskInstance(*my_task_id*, date)
state = ti.current_state()
if state != 'success':
raise ValueError('Not all previous tasks successfully completed.')
Когда вы вызываете функцию, убедитесь, что вы установили provide_context.
check_success_days_before = PythonOperator(
task_id='check_success_days_before',
python_callable= check_status,
provide_context=True,
dag=dag
)
Посмотрите на код, отвечающий за работу интерфейса командной строки, предложенный Priyank.
def task_state(args):
dag = get_dag(args)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
print(ti.current_state())
Следовательно, кажется, что вы легко сможете сделать это в своей кодовой базе DAG, используя подобный код.
В качестве альтернативы вы можете выполнить эти операции CLI из своего кода, используя Python subprocess
библиотека.
Ладно, я думаю, что знаю, что ты делаешь, и я не совсем согласен с этим, но я начну с ответа.
Простой, но хакерский способ - запросить таблицу task_instance. Я в postgres, но структура должна быть такой же. Начните с получения значения task_ids и состояния интересующей вас задачи с помощью вызова db.
SELECT task_id, state
FROM task_instance
WHERE dag_id = '<dag_id_attrib>'
AND execution_date = '<execution_date_attrib>'
AND task_id = '<task_to_check>'
Это должно дать вам состояние (и название, для справки) задачи, которую вы пытаетесь отслеживать. Состояние хранится в виде простой строчной строки.
Вы можете использовать интерфейс командной строки для этого:
airflow task_state [-h] [-sd SUBDIR] dag_id task_id execution_date
Для получения дополнительной информации об этом вы можете обратиться к официальной документации воздушного потока: