Сохраните динамически созданный DAG в Airflow вместо того, чтобы регистрировать его в планировщике
Я хотел бы сохранить свои динамические DAG-файлы и не планировать их автоматически планировщиком Airflow, поэтому я не использую утилиту globals(). Есть способ? Я видел pickling_dag, но у меня это не работает. Я хочу просто увидеть динамически созданную группу DAG и сохранить ее, а не просто автоматически планировать в Airflow. Я читаю некоторые конфиги Spark из файла конфигурации yaml. Это код:
def create_dag(dag_id,
schedule,
dag_number,
default_args,
session=None):
with open("/Users/conf.yaml") as stream:
try:
t3Params = yaml.safe_load(stream)
except yaml.YAMLError as err:
print(str(err))
dag = DAG(
dag_id,
is_paused_upon_creation=False,
default_args=default_args,
description='A dynamic DAG generated from conf files',
schedule_interval=schedule,
)
myt3Params = {
'queue': t3Params['conf']['queue'],
'exe_mem': t3Params['conf']['num_exe'],
'exe_core': t3Params['conf']['exe_core']
}
t3Cmd = "ssh myCluster@127.10.10.1 bash /home/myCluser/test-runner.sh {{ params.queue }} {{ params.exe_mem }} {{ params.exe_core }}"
task_id="xyz"
with dag:
t3 = BashOperator(task_id='test_spark_submit3', bash_command=t3Cmd, params=myt3Params, dag=dag)
dag_pickle = DagPickle(t3)
pickle.dump(dag, open("/Users/airflow/dags/pickled_dags", 'wb'))
session.add(dag_pickle)
session.commit()
return dag
for n in range(1, 5):
dag_id = 'saved_dynamic_conf_dags_{}'.format(str(n))
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2020, 9, 23, 21, 00),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
schedule = '35,36,37 * * * *'
dag_number = n
create_dag(dag_id,schedule,dag_number,default_args)