Экспорт переменных среды во время выполнения с потоком воздуха

В настоящее время я преобразую рабочие процессы, которые были реализованы в сценариях bash ранее, в группы Airflow DAG. В скриптах bash я просто экспортировал переменные во время выполнения с

export HADOOP_CONF_DIR="/etc/hadoop/conf"

Теперь я хотел бы сделать то же самое в Airflow, но пока не нашел решения для этого. Единственный обходной путь, который я нашел, - это установка переменных с помощью os.environ[VAR_NAME]='some_text' вне любого метода или оператора, но это означает, что они экспортируются в момент загрузки скрипта, а не во время выполнения.

Теперь, когда я пытаюсь позвонить os.environ[VAR_NAME] = 'some_text' в функции, которая вызывается PythonOperator, она не работает. Мой код выглядит так

def set_env():
    os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
    os.environ['PATH'] = "somePath:" + os.environ['PATH']
    os.environ['SPARK_HOME'] = "pathToSparkHome"
    os.environ['PYTHONPATH'] = "somePythonPath"
    os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
    os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()

set_env_operator = PythonOperator(
    task_id='set_env_vars_NOT_WORKING',
    python_callable=set_env,
    dag=dag)

Теперь, когда мой SparkSubmitOperator выполняется, я получаю исключение:

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

Мой случай использования, где это уместно, что у меня есть SparkSubmitOperator, где я отправляю работу в YARN, поэтому либо HADOOP_CONF_DIR или же YARN_CONF_DIR должен быть установлен в окружающей среде. Устанавливая их в мой .bashrc или любой другой конфиг, к сожалению, для меня невозможен, поэтому мне нужно установить их во время выполнения.

Желательно, чтобы я установил их в Операторе перед выполнением SparkSubmitOperator, но если бы была возможность передать их в качестве аргументов SparkSubmitOperator, это было бы по крайней мере что-то.

2 ответа

Из того, что я вижу в операторе spark submit, вы можете передать переменные окружения в spark-submit в виде словаря.

:param env_vars: Environment variables for spark-submit. It
                 supports yarn and k8s mode too.
:type env_vars: dict

Вы пробовали это?

Согласно этому ответу переменные должны быть помещены в /etc/default/airflow (в Debian / Ubuntu) или /etc/sysconfig/airflow(на Centos / Redhat) в агенте Airflow. Тогда они должны быть доступны при звонке os.environ[].

Другие вопросы по тегам