Как выполнять ежечасные инкрементные извлечения из источника данных оракула с использованием last_mod_dt (отметка времени) в Airflow?
Необходимо выполнять ежечасное обновление / загрузку таблицы с помощью Airflow/Python из источника данных oracle с использованием столбца timestamp last_modified_dt.
В Airflow есть airflow.models.taskinstance API, который предоставляет данные из таблицы метаданных task_instance и имеет следующие поля (показаны с образцами данных), предполагая, что дата / время первого выполнения dag / tasks были 01.01.2020 05:00:-
task_id, dag_id, execution_datetime (of dag), start_date, end_date, duration, state, ....
task_a, oracle, 1/1/2020 05:00:00, 1/1/2020 05:00:00, 1/1/2020 05:05:00, 0.5, success, ....
task_b, oracle, 1/1/2020 05:00:00, 1/1/2020 05:01:00, 1/1/2020 05:04:00, 0.3, success, ....
task_c, oracle, 1/1/202005:00:00, 1/1/2020 05:02:00, 1/1/2020 05:06:00, 0.4, success, ....
Итак, я подумываю использовать эту таблицу метаданных task_instance или API, чтобы получить предыдущую дату и время начала каждой задачи и ее состояние (успех) и использовать это в условиях, как показано ниже:
Итак, при запуске через час на 01.01.2020 06:00:00:-
select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;
Это правильный подход? Если да, то прямой запрос таблицы task_instance метаданных воздушного потока каждый раз, чтобы получить предыдущее или максимальное (start_datetime) задачи (s), будет иметь какие-либо последствия для производительности? Если да, то как получить предыдущее значение start_datetime и "успешное" состояние задачи через API airflow.models.taskinstance (https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html)
Благодаря!
1 ответ
Для начала важно понять, как
execution_date
работает, см. Scheduler Doc:
Планировщик не будет запускать ваши задачи до тех пор, пока не закончится период, который он охватывает, например, задание с schedule_interval, установленным как @daily, запускается после окончания дня. Этот метод гарантирует, что все данные, необходимые для этого периода, будут полностью доступны до выполнения dag. В пользовательском интерфейсе кажется, что Airflow выполняет ваши задачи на день позже.
Если вы запускаете DAG с интервалом schedule_interval в один день, запуск с execute_date 2019-11-21 запускается вскоре после 2019-11-21T23:59.
Повторим, планировщик запускает ваше задание через один schedule_interval ПОСЛЕ даты начала, в КОНЕЦ периода.
Это означает, ссылаясь на
execution_date
вы получите точное время, когда был запущен последний запуск.
Что касается запроса, я бы не стал запрашивать базу данных, чтобы получить дату последнего выполнения, а использовал бы макросы, которые поставляются из коробки с Airflow - см. Эту ссылку:
Вы должны просто использовать
{{ execution_date }}
в вашем запросе, и Airflow должен заменить его при запуске DAG.