Экспорт переменных среды во время выполнения с потоком воздуха
В настоящее время я преобразую рабочие процессы, которые были реализованы в сценариях bash ранее, в группы Airflow DAG. В скриптах bash я просто экспортировал переменные во время выполнения с
export HADOOP_CONF_DIR="/etc/hadoop/conf"
Теперь я хотел бы сделать то же самое в Airflow, но пока не нашел решения для этого. Единственный обходной путь, который я нашел, - это установка переменных с помощью os.environ[VAR_NAME]='some_text'
вне любого метода или оператора, но это означает, что они экспортируются в момент загрузки скрипта, а не во время выполнения.
Теперь, когда я пытаюсь позвонить os.environ[VAR_NAME] = 'some_text'
в функции, которая вызывается PythonOperator, она не работает. Мой код выглядит так
def set_env():
os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
os.environ['PATH'] = "somePath:" + os.environ['PATH']
os.environ['SPARK_HOME'] = "pathToSparkHome"
os.environ['PYTHONPATH'] = "somePythonPath"
os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()
set_env_operator = PythonOperator(
task_id='set_env_vars_NOT_WORKING',
python_callable=set_env,
dag=dag)
Теперь, когда мой SparkSubmitOperator выполняется, я получаю исключение:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
Мой случай использования, где это уместно, что у меня есть SparkSubmitOperator
, где я отправляю работу в YARN, поэтому либо HADOOP_CONF_DIR
или же YARN_CONF_DIR
должен быть установлен в окружающей среде. Устанавливая их в мой .bashrc
или любой другой конфиг, к сожалению, для меня невозможен, поэтому мне нужно установить их во время выполнения.
Желательно, чтобы я установил их в Операторе перед выполнением SparkSubmitOperator
, но если бы была возможность передать их в качестве аргументов SparkSubmitOperator
, это было бы по крайней мере что-то.
2 ответа
Из того, что я вижу в операторе spark submit, вы можете передать переменные окружения в spark-submit в виде словаря.
:param env_vars: Environment variables for spark-submit. It
supports yarn and k8s mode too.
:type env_vars: dict
Вы пробовали это?
Согласно этому ответу переменные должны быть помещены в
/etc/default/airflow
(в Debian / Ubuntu) или
/etc/sysconfig/airflow
(на Centos / Redhat) в агенте Airflow. Тогда они должны быть доступны при звонке
os.environ[]
.