Конфигурация нескольких BranchPythonOperator DAG
import logging
import pandas as pd
import boto3
from datetime import datetime
from airflow import DAG, settings
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.models import Variable, Connection
from airflow.utils.trigger_rule import TriggerRule
from settings import settings_manager
from config.misc import misc_config
logger = logging.getLogger(__name__)
list_tables_query = f"""SHOW TABLES IN SCHEMA {settings_manager.SNOWFLAKE_DATABASE}.STAGING"""
overwrite_variable_name = 'overwrite_staging_tables'
dag = DAG("v4_functional_table_creation", schedule_interval=None,
start_date=datetime(2019, 2, 20))
start = DummyOperator(task_id='start',
dag=dag)
def print_str(file):
logger.info(file)
def list_existing_tables():
# left out for brevity
return table_list
def does_table_exists(file, table_name, list_of_tables):
if table_name.upper() in list_of_tables:
return f"""overwrite_check_for_{table_name}"""
else:
return f"""get_latest_{file}_file"""
def overwrite_check(source_name, table_name):
overwrite = 'False'
if overwrite == 'True':
return f"""drop_{source_name}.{table_name}"""
else:
return "end"
def create_table(table_name, file_path):
logger.info(f"""creating table {table_name} using {file_path} file.""")
def get_latest_uploaded_file(file, source):
"""
Given a s3 prefix, returns path of the latest uploaded file.
"""
# left out for brevity
return latest.get('Key', '')
list_existing_tables = PythonOperator(task_id="list_existing_tables",
python_callable=list_existing_tables,
dag=dag)
end = DummyOperator(task_id='end',
dag=dag)
start >> list_existing_tables
for source in misc_config.get('s3_sources_to_parse'):
file_list_str = Variable.get(f"""file_list_{source}""")
file_list_str = file_list_str[2:-2]
file_list = file_list_str.split(',')
for file_str in file_list:
file = file_str.strip(" ").strip('"').strip("'")
table_name = f"""{source}_{file}"""
check_table_exists = BranchPythonOperator(task_id=f"""check_{table_name}_exists""",
python_callable=does_table_exists,
op_kwargs={'table_name': table_name,
'list_of_tables': list_existing_tables.output,
'file': file},
dag=dag)
check_overwrite_condition = BranchPythonOperator(task_id=f"""overwrite_check_for_{table_name}""",
python_callable=overwrite_check,
op_kwargs={'source_name': source,
'table_name': table_name},
dag=dag)
get_latest_file = PythonOperator(task_id=f"""get_latest_{file}_file""",
python_callable=get_latest_uploaded_file,
op_kwargs={'file': file,
'source': source},
trigger_rule='none_failed_or_skipped',
dag=dag)
drop_table = PythonOperator(task_id=f"""drop_{table_name}""",
python_callable=print_str,
op_kwargs={'file': f"""dropping_{table_name}"""},
trigger_rule='none_failed_or_skipped',
dag=dag)
create_table_task = PythonOperator(task_id=f"""create_{table_name}""",
python_callable=create_table,
op_kwargs={'table_name': table_name,
'file_path': get_latest_file.output},
dag=dag)
list_existing_tables >> check_table_exists >> [check_overwrite_condition, get_latest_file]
check_overwrite_condition >> [drop_table, end]
drop_table >> get_latest_file >> create_table_task >> end
Для группы DAG, объявленной в нижней части файла, со ссылкой на прилагаемый снимок экрана, я надеюсь, что и
create_sos_waffle_switch
быть пропущенным. Я не уверен, как / почему
get_latest_waffle_switch_file
задача запускается.
Я думаю, это должен быть вопрос о правильном объявлении отношений DAG / задача.
Любые указатели будут действительно полезны! заранее спасибо
_/\_
[1]: https://stackru.com/images/76dfed5fdeaa881898715541630f3a0a7036f88f.png