Как я могу сгенерировать динамические группы DAG Spark с помощью Airflow 2.0 из файла conf?

Я запускаю приложение Spark с помощью LivyOperator в Airflow. Но сейчас все мои аргументы жестко закодированы. Как лучше всего написать файл python для динамической генерации аналогичного DAG, используя вместо этого файл conf. Я просмотрел https://www.astronomer.io/guides/dynamically-generating-dags, но, похоже, здесь вообще не используется файл conf. Вот что у меня есть на данный момент.

      from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

JAR_FILE = 'hdfs://x.jar'
EXECUTE_CLASS = 'SomeClass'
JOB_CONFIG_FILE = 'hdfs://job.conf'
DB_CONF_FILE = 'hdfs://db.cfg'
INPUT_DIR = '/xyz_hdfs'
OUTPUT_DIR = '/tmp/airflow/xyz/'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 2, 11),
    'email': ['xxx'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(dag_id='Airflow-Livy',
          default_args=default_args,
          catchup=False,
          schedule_interval="0 * * * *")

app_variables = ["--input-dir", INPUT_DIR, "--output-dir", OUTPUT_DIR, "--env", "prod","--enable-coalesce-feature", "true", "--config", "db.cfg"]
default_conf = {
    "spark.driver.extraJavaOptions": f"-Dconfig.file={JOB_CONFIG_FILE}#some_conf.conf -Xss64m",
    "spark.driver.extraJavaOptions": f"-Dconfig.file={DB_CONF_FILE}#db.cfg -Xss64m",
    "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
    "spark.sql.codegen.wholeStage": "false",
    "spark.yarn.executor.memoryOverhead": "2g",
    "spark.network.timeout": "900s",
    "spark.sql.shuffle.partitions": 400,
    "spark.sql.broadcastTimeout": 600,
    "spark.maxRemoteBlockSizeFetchToMem": 2147480000,
    "livy.spark.master": "yarn",
    "livy.spark.deployMode": "cluster",
    "spark.yarn.queue": "randomQueueYarn"
}
t1 = LivyOperator(task_id='SomeTask',
                  proxy_user='proxy',
                  livy_conn_id='livy_conn_default',
                  file=JAR_FILE,
                  class_name=EXECUTE_CLASS,
                  conf=default_conf,
                  files=["hdfs://db.cfg", "hdfs://job.conf"],
                  args=app_variables,
                  executor_cores=3,
                  executor_memory='8g',
                  driver_memory='8g',
                  name='SomeTask',
                  dag=dag
                  )

Теперь, как я могу написать код генератора Dag с помощью утилиты create_dag в Airflow из файла conf, как показано ниже, который в конечном итоге выглядит так, как показано выше?

      name = "SomeTask"
priority= 10


job-config {
inputDir= "xxx"
outputDir="yyy"
.....
}

spark-config {
"spark.executor.cores" = 2
"spark.executor.memory" = 5g
"spark.executor.instances" = 2
"spark.sql.codegen.wholeStage" = false
.....

}


airfow-config {
pools=xxx
.....
}

Я был бы очень признателен за какой-нибудь рабочий образец кода или пример.

0 ответов

Другие вопросы по тегам