Сохраните динамически созданный DAG в Airflow вместо того, чтобы регистрировать его в планировщике

Я хотел бы сохранить свои динамические DAG-файлы и не планировать их автоматически планировщиком Airflow, поэтому я не использую утилиту globals(). Есть способ? Я видел pickling_dag, но у меня это не работает. Я хочу просто увидеть динамически созданную группу DAG и сохранить ее, а не просто автоматически планировать в Airflow. Я читаю некоторые конфиги Spark из файла конфигурации yaml. Это код:

      
def create_dag(dag_id,
               schedule,
               dag_number,
               default_args,
               session=None):

    with open("/Users/conf.yaml") as stream:
        try:
            t3Params = yaml.safe_load(stream)
        except yaml.YAMLError as err:
            print(str(err))

    dag = DAG(
        dag_id,
        is_paused_upon_creation=False,
        default_args=default_args,
        description='A dynamic DAG generated from conf files',
        schedule_interval=schedule,
    )

    myt3Params = {
        'queue': t3Params['conf']['queue'],
        'exe_mem': t3Params['conf']['num_exe'],
        'exe_core': t3Params['conf']['exe_core']
    }

    t3Cmd = "ssh myCluster@127.10.10.1 bash /home/myCluser/test-runner.sh {{ params.queue }} {{ params.exe_mem }} {{ params.exe_core }}"

    task_id="xyz"
    with dag:
        t3 = BashOperator(task_id='test_spark_submit3', bash_command=t3Cmd, params=myt3Params, dag=dag)

    dag_pickle = DagPickle(t3)
    pickle.dump(dag, open("/Users/airflow/dags/pickled_dags", 'wb'))

    session.add(dag_pickle)
    session.commit()


    return dag

for n in range(1, 5):
    dag_id = 'saved_dynamic_conf_dags_{}'.format(str(n))

    default_args = {
        'owner': 'me',
        'depends_on_past': False,
        'start_date': datetime(2020, 9, 23, 21, 00),
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=1)
    }

    schedule = '35,36,37 * * * *'

    dag_number = n

    create_dag(dag_id,schedule,dag_number,default_args)

0 ответов

Другие вопросы по тегам