Состояние задачи "Воздушный поток" в пределах метки

Мне нужен статус задания, например, если оно выполняется, или вы неуспешно пытаетесь выполнить его в течение того же дня. Поэтому я попытался получить его, используя приведенный ниже код, хотя я не получил вывод...

Auto = PythonOperator(
    task_id='test_sleep',
    python_callable=execute_on_emr,
    op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'},
    dag=dag)

logger.info(Auto)

Намерение состоит в том, чтобы убить определенные выполняющиеся задачи после выполнения конкретной задачи в потоке воздуха.

Вопрос в том, как мне получить состояние задачи, например, в рабочем состоянии, сбой или успех?

4 ответа

Я делаю что-то подобное. Мне нужно проверить одну задачу, если предыдущие 10 запусков другой задачи были успешными. taky2 отправил меня на правильный путь. На самом деле это довольно просто:

from airflow.models import TaskInstance
ti = TaskInstance(*your_task*, execution_date)
state = ti.current_state()

Поскольку я хочу проверить, что в пределах метки, нет необходимости указывать метку. Я просто создал функцию, чтобы перебрать прошлые n_days и проверить статус.

def check_status(**kwargs):
    last_n_days = 10
    for n in range(0,last_n_days):
        date = kwargs['execution_date']- timedelta(n)
        ti = TaskInstance(*my_task_id*, date)
        state = ti.current_state()
        if state != 'success':
            raise ValueError('Not all previous tasks successfully completed.')

Когда вы вызываете функцию, убедитесь, что вы установили provide_context.

check_success_days_before = PythonOperator(
    task_id='check_success_days_before',
    python_callable= check_status,
    provide_context=True,
    dag=dag
)

Посмотрите на код, отвечающий за работу интерфейса командной строки, предложенный Priyank.

https://github.com/apache/incubator-airflow/blob/2318cea74d4f71fba353eaca9bb3c4fd3cdb06c0/airflow/bin/cli.py

def task_state(args):
    dag = get_dag(args)
    task = dag.get_task(task_id=args.task_id)
    ti = TaskInstance(task, args.execution_date)
    print(ti.current_state())

Следовательно, кажется, что вы легко сможете сделать это в своей кодовой базе DAG, используя подобный код.

В качестве альтернативы вы можете выполнить эти операции CLI из своего кода, используя Python subprocess библиотека.

Ладно, я думаю, что знаю, что ты делаешь, и я не совсем согласен с этим, но я начну с ответа.

Простой, но хакерский способ - запросить таблицу task_instance. Я в postgres, но структура должна быть такой же. Начните с получения значения task_ids и состояния интересующей вас задачи с помощью вызова db.

SELECT task_id, state
FROM task_instance
WHERE dag_id = '<dag_id_attrib>'
  AND execution_date = '<execution_date_attrib>'
  AND task_id = '<task_to_check>'

Это должно дать вам состояние (и название, для справки) задачи, которую вы пытаетесь отслеживать. Состояние хранится в виде простой строчной строки.

Вы можете использовать интерфейс командной строки для этого:

 airflow task_state [-h] [-sd SUBDIR] dag_id task_id execution_date

Для получения дополнительной информации об этом вы можете обратиться к официальной документации воздушного потока:

http://airflow.incubator.apache.org/cli.html

Другие вопросы по тегам