Airflow S3KeySensor - Как заставить его продолжать работать
С помощью этого поста Stackru я только что создал программу (ту, которая показана в посте), где, когда файл помещается в корзину S3, запускается задача в одном из моих работающих DAG, а затем я выполняю некоторую работу с помощью BashOperator. После того, как это сделано, хотя группа обеспечения доступности баз данных больше не находится в рабочем состоянии, а вместо этого переходит в состояние успеха, и если я хочу, чтобы он взял другой файл, мне нужно очистить все "Прошлое", "Будущее", "Восходящий поток", " Нисходящая деятельность. Я хотел бы сделать эту программу так, чтобы она всегда работала и каждый раз, когда новый файл помещался в корзину S3, программа запускала задачи.
Могу ли я продолжать использовать S3KeySenor для этого или мне нужно найти способ настройки внешнего триггера для запуска моей группы DAG? На данный момент мой S3KeySensor довольно бессмыслен, если он когда-либо будет запущен только один раз.
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
task_id='create_emr_cluster_1',
bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
retries=1,
dag=dag)
t1 = BashOperator(
task_id='success_log',
bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
dag=dag)
sensor = S3KeySensor(
task_id='new_s3_file_in_foobar-bucket',
bucket_key='*',
wildcard_match=True,
bucket_name='foobar-bucket',
s3_conn_id='s3://foobar-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
t2.set_upstream(t1)
Мне интересно, если это невозможно, потому что тогда это был бы не Направленный ациклический граф, а скорее он имел бы цикл, который повторял датчик -> t1 -> t2 -> датчик -> t1 -> t2 -> датчик -> ... продолжайте повторять.
Обновить:
Мой пример использования довольно прост: каждый раз, когда новый файл помещается в назначенное ведро AWS S3, я хочу, чтобы мой DAG запускался и запускал процесс выполнения различных задач. Задачи будут выполнять такие вещи, как создание нового кластера AWS EMR, извлечение файлов из корзины AWS S3, выполнение некоторых операций AWS EMR, а затем завершение работы кластера AWS EMR. После этого группа обеспечения доступности баз данных вернется в состояние ожидания, где она будет ожидать поступления новых файлов в корзину AWS S3, а затем повторять процесс бесконечно долго.
2 ответа
В Airflow нет концепции, которая бы отображала постоянно работающую группу DAG. DAG может запускаться очень часто, например, каждые 1–5 минут, если это подходит для вашего случая использования.
Главное здесь заключается в том, что S3KeySensor проверяет, пока не обнаружит, что первый файл существует в подстановочном пути ключа (или в тайм-ауте), а затем запускается. Но когда появится второй, третий или четвертый файл, датчик S3 уже завершит работу для этого прогона DAG. Запланированный запуск не будет выполнен до следующего запуска DAG. (Идея зацикливания, которую вы описали, примерно эквивалентна тому, что делает планировщик при создании прогонов DAG, но не навсегда.)
Внешний триггер определенно звучит как лучший подход для вашего варианта использования, независимо от того, происходит ли этот триггер с помощью команды trigger_dag CLI Airflow ($ airflow trigger_dag ...
):
Или через REST API:
Обернитесь и назовите trigger_dag
функция в общем (экспериментальном) API:
Например, можно настроить лямбда-функцию AWS, вызываемую при попадании файла на S3, которая запускает триггерный вызов DAG.
Другой способ — использовать триггер S3 для лямбды aws, которая будет вызывать DAG с помощью API.
событие s3 -> aws lambda -> API воздушного потока
Настройте уведомление S3 для запуска лямбда
https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
API воздушного потока
https://airflow.apache.org/docs/apache-airflow/stable/rest-api-ref.html