Как выполнять ежечасные инкрементные извлечения из источника данных оракула с использованием last_mod_dt (отметка времени) в Airflow?

Необходимо выполнять ежечасное обновление / загрузку таблицы с помощью Airflow/Python из источника данных oracle с использованием столбца timestamp last_modified_dt.

В Airflow есть airflow.models.taskinstance API, который предоставляет данные из таблицы метаданных task_instance и имеет следующие поля (показаны с образцами данных), предполагая, что дата / время первого выполнения dag / tasks были 01.01.2020 05:00:-

task_id, dag_id, execution_datetime (of dag), start_date, end_date, duration, state, ....
task_a, oracle, 1/1/2020 05:00:00, 1/1/2020 05:00:00, 1/1/2020 05:05:00, 0.5, success, ....
task_b, oracle, 1/1/2020 05:00:00, 1/1/2020 05:01:00, 1/1/2020 05:04:00, 0.3, success, ....
task_c, oracle, 1/1/202005:00:00, 1/1/2020 05:02:00, 1/1/2020 05:06:00, 0.4, success, ....

Итак, я подумываю использовать эту таблицу метаданных task_instance или API, чтобы получить предыдущую дату и время начала каждой задачи и ее состояние (успех) и использовать это в условиях, как показано ниже:

Итак, при запуске через час на 01.01.2020 06:00:00:-

select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;

Это правильный подход? Если да, то прямой запрос таблицы task_instance метаданных воздушного потока каждый раз, чтобы получить предыдущее или максимальное (start_datetime) задачи (s), будет иметь какие-либо последствия для производительности? Если да, то как получить предыдущее значение start_datetime и "успешное" состояние задачи через API airflow.models.taskinstance (https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html)

Благодаря!

1 ответ

Решение

Для начала важно понять, как execution_dateработает, см. Scheduler Doc:

Планировщик не будет запускать ваши задачи до тех пор, пока не закончится период, который он охватывает, например, задание с schedule_interval, установленным как @daily, запускается после окончания дня. Этот метод гарантирует, что все данные, необходимые для этого периода, будут полностью доступны до выполнения dag. В пользовательском интерфейсе кажется, что Airflow выполняет ваши задачи на день позже.

Если вы запускаете DAG с интервалом schedule_interval в один день, запуск с execute_date 2019-11-21 запускается вскоре после 2019-11-21T23:59.

Повторим, планировщик запускает ваше задание через один schedule_interval ПОСЛЕ даты начала, в КОНЕЦ периода.

Это означает, ссылаясь на execution_date вы получите точное время, когда был запущен последний запуск.

Что касается запроса, я бы не стал запрашивать базу данных, чтобы получить дату последнего выполнения, а использовал бы макросы, которые поставляются из коробки с Airflow - см. Эту ссылку:

Вы должны просто использовать {{ execution_date }} в вашем запросе, и Airflow должен заменить его при запуске DAG.

Другие вопросы по тегам