Использование одного знака на нескольких входах в Airflow

Я пытаюсь реализовать конвейер в Airflow, который будет многократно запускаться для разных входных файлов. Я хочу отправить в AWS несколько таких заданий, чтобы они могли работать параллельно.

Может ли Airflow обрабатывать планирование и представление для этого? (и каков эффективный способ Airflow сделать это?)

0 ответов

Я не использовал Airflow с AWS, но, безусловно, возможно инициировать несколько запусков одного и того же рабочего процесса DAG.

import uuid
import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils.state import State
from airflow.operators.python_operator import PythonOperator

logger = logging.getLogger(__name__)

SCHEDULE = timedelta(seconds=10)


default_args = dict(
    owner='Airflow Demo',
    queue='demo_queue',
)

scheduling_dag = DAG(
    dag_id='scheduling_dag',
    default_args=default_args,
    start_date=datetime.utcnow() - SCHEDULE,
    schedule_interval=SCHEDULE,
)


def trigger_dag_run(**op_kwargs):
    for conf in [dict(filename='file1.txt'), dict(filename='file2.txt')]:
        workflow.create_dagrun(
            run_id='wf_{}_{}'.format(
                datetime.now().isoformat(), uuid.uuid4().hex),
            state=State.RUNNING,
            conf=conf,
            external_trigger=True
        )


trigger = PythonOperator(
    task_id='trigger',
    python_callable=trigger_dag_run,
    dag=scheduling_dag
)


workflow = DAG(
    dag_id='workflow',
    default_args=default_args,
    start_date=datetime.utcnow(),
    schedule_interval=None
)


def py_callable(**op_kwargs):
    logger.info('DAG run for: {}'.format(
        op_kwargs['dag_run'].conf['filename']))


task = PythonOperator(
    task_id='task',
    python_callable=py_callable,
    provide_context=True,
    dag=workflow
)
Другие вопросы по тегам