Airflow не будет писать логи на s3
Я пробовал разные способы настроить Airflow 1.9 для записи логов в s3, однако он просто игнорирует это. После этого я обнаружил, что у многих людей возникают проблемы с чтением журналов, однако моя проблема заключается в том, что журналы остаются локальными. Я могу прочитать их без проблем, но они не находятся в указанном ведре s3.
Сначала я попытался записать файл airflow.cfg.
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False
Затем я попытался установить переменные среды
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
Однако это игнорируется, и файлы журнала остаются локальными.
Я запускаю поток воздуха из контейнера, я адаптировал https://github.com/puckel/docker-airflow к своему случаю, но он не будет записывать логи в s3. Я использую соединение aws для записи в сегменты в dags, и это работает, но журналы просто остаются локальными, независимо от того, запускаю ли я их на EC2 или локально на моей машине.
4 ответа
Я наконец-то нашел ответ, используя /questions/27349552/airflow-19-ne-udaetsya-poluchit-logi-dlya-zapisi-v-s3/27349560#27349560 это большая часть работы, которую мне тогда пришлось объявить еще на один шаг. Я воспроизвожу здесь этот ответ и немного адаптирую его так, как я это сделал:
Некоторые вещи, чтобы проверить:
- Убедитесь, что у вас есть
log_config.py
файл и он находится в правильном каталоге:./config/log_config.py
, - Убедитесь, что вы не забыли
__init__.py
файл в этом каталоге. - Убедитесь, что вы определили
s3.task
обработчик и установите его форматерairflow.task
- Убедитесь, что вы установили обработчики airflow.task и airflow.task_runner в s3.task
- Задавать
task_log_reader = s3.task
вairflow.cfg
- Пройти
S3_LOG_FOLDER
вlog_config
, Я сделал это с помощью переменной и извлек ее, как в следующемlog_config.py
,
Вот log_config.py, который работает:
import os
from airflow import configuration as conf
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Обратите внимание, что таким образом S3_LOG_FOLDER
можно указать в airflow.cfg
или в качестве переменной среды AIRFLOW__CORE__S3_LOG_FOLDER
,
Если это поможет кому-то другому, вот что сработало для меня, ответил в аналогичном сообщении: /questions/15907398/nastrojka-s3-dlya-logov-v-potoke-vozduha/64093506#64093506
Еще одна вещь, которая приводит к такому поведению (Airflow 1.10):
Если вы посмотрите на airflow.utils.log.s3_task_handler.S3TaskHandler
вы заметите, что есть несколько условий, при которых журналы не будут записываться в S3:
1) Экземпляр регистратора уже close()
d (не уверен, как это происходит на практике)
2) Файл журнала не существует на локальном диске (вот как я дошел до этого момента)
Вы также заметите, что регистратор работает в многопроцессорной / многопоточной среде, и что Airflow S3TaskHandler
а также FileTaskHandler
делать какие-то вещи с файловой системой. Если предположения о файлах журналов на диске будут выполнены, файлы журналов S3 не будут записываться, и ничего не регистрируется и не выдается об этом событии. Если у вас есть конкретные, четко определенные потребности в ведении журнала, было бы неплохо реализовать все свои собственные logging
Handlers
(см. питон logging
документы) и отключите все обработчики журнала Airflow (см. Airflow UPDATING.md
).
Еще одна вещь, которая может привести к такому поведению - Botocore может быть не установлен. Убедитесь, что при установке воздушного потока, чтобы включить пакет S3 pip install apache-airflow[s3]