Использование одного знака на нескольких входах в Airflow
Я пытаюсь реализовать конвейер в Airflow, который будет многократно запускаться для разных входных файлов. Я хочу отправить в AWS несколько таких заданий, чтобы они могли работать параллельно.
Может ли Airflow обрабатывать планирование и представление для этого? (и каков эффективный способ Airflow сделать это?)
0 ответов
Я не использовал Airflow с AWS, но, безусловно, возможно инициировать несколько запусков одного и того же рабочего процесса DAG.
import uuid
import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils.state import State
from airflow.operators.python_operator import PythonOperator
logger = logging.getLogger(__name__)
SCHEDULE = timedelta(seconds=10)
default_args = dict(
owner='Airflow Demo',
queue='demo_queue',
)
scheduling_dag = DAG(
dag_id='scheduling_dag',
default_args=default_args,
start_date=datetime.utcnow() - SCHEDULE,
schedule_interval=SCHEDULE,
)
def trigger_dag_run(**op_kwargs):
for conf in [dict(filename='file1.txt'), dict(filename='file2.txt')]:
workflow.create_dagrun(
run_id='wf_{}_{}'.format(
datetime.now().isoformat(), uuid.uuid4().hex),
state=State.RUNNING,
conf=conf,
external_trigger=True
)
trigger = PythonOperator(
task_id='trigger',
python_callable=trigger_dag_run,
dag=scheduling_dag
)
workflow = DAG(
dag_id='workflow',
default_args=default_args,
start_date=datetime.utcnow(),
schedule_interval=None
)
def py_callable(**op_kwargs):
logger.info('DAG run for: {}'.format(
op_kwargs['dag_run'].conf['filename']))
task = PythonOperator(
task_id='task',
python_callable=py_callable,
provide_context=True,
dag=workflow
)