Как установить модули зависимостей для задачи Airflow DAG (или кода Python)? , «Не удалось импортировать модуль» в DAG воздушного потока при использовании kuberentesExecutor
У меня есть DAG воздушного потока «example_ml.py», в котором есть задача «train_ml_model», и эта задача вызывает / запускает скрипт python «training.py».
-Dags / example_ml.py-Dags / training.py
Когда я запускаю Dag, он не может импортировать модули, необходимые для выполнения сценария обучения.
Фрагмент кода для задачи DAG:
train_model = PythonOperator(
task_id='train_model',
python_callable=training,
dag = dag
)
PS: Я использую кластер k8s. Воздушный поток работает в кластере k8s, а исполнитель установлен на kubernetesExecutor. Поэтому, когда запускается каждая группа DAG, для выполнения задачи назначается новый модуль.
4 ответа
Лучшие практики (насколько мне известно) — создать и предоставить собственный образ докера со всеми необходимыми зависимостями. Как только вы это сделаете, вы отправляете его в репозиторий по вашему выбору, а затем есть определенный набор параметров, который вы можете использовать в своем файле DAG, чтобы объявить, какой образ докера использовать для каждой задачи в вашей DAG, например:
def use_airflow_binary():
rc = os.system("airflow -h")
assert rc == 0
# You don't have to use any special KubernetesExecutor configuration if you don't want to
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag
)
# But you can if you want to
one_task = PythonOperator(
task_id="one_task", python_callable=print_stuff, dag=dag,
executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)
# Use the airflow -h binary
two_task = PythonOperator(
task_id="two_task", python_callable=use_airflow_binary, dag=dag,
executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)
Интересный момент здесь:
executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
Здесь вы можете использовать созданный вами собственный образ докера.
Доступно еще больше возможностей (распределение ресурсов, сходство и т. д.)
Все кредиты принадлежат Марку Ламберти
Я нашел простой (и уродливый) обходной путь для этой проблемы в моем случае использования.
Проблема, которую я решаю, заключается в простой установке пакета в среде, которую трудно найти и которой трудно управлять, поскольку она расположена глубоко внутри контейнеров и путей.
Моя причина найти это решение заключается в том, что мы можем запускать код Python внутри этой среды, которая иначе недоступна с помощью pip-команд.
Что я сделал, так это установил пакет непосредственно из самой DAG, вот как выглядит моя функция DAG:
def dag_function(**kwargs):
import sys
import subprocess
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'packagename'])
import packagename
# ... Use package
Источник для установки пакетов из скрипта Python: https://www.activestate.com/resources/quick-reads/how-to-install-python-packages-using-a-script/
Это нарушает различные принципы, один из которых заключается в том, чтобы все ваши требования отделялись от кода, но он работает без необходимости перекомпилировать целые изображения, так что это приятно.
Не могли бы вы подробнее рассказать? Вы запускаете это на своем локальном компьютере? Контейнер? Вы уверены, что пакет установлен? Как вы прокомментировали, ошибка, похоже, связана с отсутствующим пакетом. Создание задачи для установки может не решить проблему. В идеале просто установите требования на все, что вы используете, воздушный поток.
У меня была такая же проблема, и вот как я ее решил:
запуск следующего кода Python
>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
Я получаю эти пути
'/home/.local/lib/python3.6/site-packages',
'/usr/local/lib/python3.6/dist-packages',
'/usr/lib/python3/dist-packages'
для меня воздушный поток указан в
'/usr/local/lib/python3.6/dist-packages'
поэтому, чтобы пакет можно было найти, его нужно установить прямо здесь. Я использовал эту команду для установки своего пакета:
sudo python3 -m pip install -system [package-name] -t $(pwd)