Как я могу сгенерировать динамические группы DAG Spark с помощью Airflow 2.0 из файла conf?
Я запускаю приложение Spark с помощью LivyOperator в Airflow. Но сейчас все мои аргументы жестко закодированы. Как лучше всего написать файл python для динамической генерации аналогичного DAG, используя вместо этого файл conf. Я просмотрел https://www.astronomer.io/guides/dynamically-generating-dags, но, похоже, здесь вообще не используется файл conf. Вот что у меня есть на данный момент.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
JAR_FILE = 'hdfs://x.jar'
EXECUTE_CLASS = 'SomeClass'
JOB_CONFIG_FILE = 'hdfs://job.conf'
DB_CONF_FILE = 'hdfs://db.cfg'
INPUT_DIR = '/xyz_hdfs'
OUTPUT_DIR = '/tmp/airflow/xyz/'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 2, 11),
'email': ['xxx'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(dag_id='Airflow-Livy',
default_args=default_args,
catchup=False,
schedule_interval="0 * * * *")
app_variables = ["--input-dir", INPUT_DIR, "--output-dir", OUTPUT_DIR, "--env", "prod","--enable-coalesce-feature", "true", "--config", "db.cfg"]
default_conf = {
"spark.driver.extraJavaOptions": f"-Dconfig.file={JOB_CONFIG_FILE}#some_conf.conf -Xss64m",
"spark.driver.extraJavaOptions": f"-Dconfig.file={DB_CONF_FILE}#db.cfg -Xss64m",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"spark.sql.codegen.wholeStage": "false",
"spark.yarn.executor.memoryOverhead": "2g",
"spark.network.timeout": "900s",
"spark.sql.shuffle.partitions": 400,
"spark.sql.broadcastTimeout": 600,
"spark.maxRemoteBlockSizeFetchToMem": 2147480000,
"livy.spark.master": "yarn",
"livy.spark.deployMode": "cluster",
"spark.yarn.queue": "randomQueueYarn"
}
t1 = LivyOperator(task_id='SomeTask',
proxy_user='proxy',
livy_conn_id='livy_conn_default',
file=JAR_FILE,
class_name=EXECUTE_CLASS,
conf=default_conf,
files=["hdfs://db.cfg", "hdfs://job.conf"],
args=app_variables,
executor_cores=3,
executor_memory='8g',
driver_memory='8g',
name='SomeTask',
dag=dag
)
Теперь, как я могу написать код генератора Dag с помощью утилиты create_dag в Airflow из файла conf, как показано ниже, который в конечном итоге выглядит так, как показано выше?
name = "SomeTask"
priority= 10
job-config {
inputDir= "xxx"
outputDir="yyy"
.....
}
spark-config {
"spark.executor.cores" = 2
"spark.executor.memory" = 5g
"spark.executor.instances" = 2
"spark.sql.codegen.wholeStage" = false
.....
}
airfow-config {
pools=xxx
.....
}
Я был бы очень признателен за какой-нибудь рабочий образец кода или пример.