Airflow 1.9 - Не удается получить логи для записи в s3
Я запускаю воздушный поток 1.9 в kubernetes в AWS. Я бы хотел, чтобы логи перешли на s3, так как сами контейнеры с воздушным потоком недолговечны.
Я прочитал различные темы и документы, которые описывают процесс, но я все еще не могу заставить его работать. Сначала тест, который показывает мне, что конфигурация и разрешения s3 действительны. Это выполняется на одном из наших рабочих экземпляров.
Используйте поток воздуха для записи в файл s3
airflow@airflow-worker-847c66d478-lbcn2:~$ id
uid=1000(airflow) gid=1000(airflow) groups=1000(airflow)
airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
airflow@airflow-worker-847c66d478-lbcn2:~$ python
Python 3.6.4 (default, Dec 21 2017, 01:37:56)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> s3 = airflow.hooks.S3Hook('s3_logs')
/usr/local/lib/python3.6/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing S3Hook directly from <module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'> has been deprecated. Please import from '<module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
>>> s3.load_string('put this in s3 file', airflow.conf.get('core', 'remote_base_log_folder') + "/airflow-test")
[2018-02-23 18:43:58,437] {{base_hook.py:80}} INFO - Using connection to: vevo-dev-us-east-1-services-airflow
Теперь давайте возьмем файл из s3 и посмотрим на его содержимое. Мы видим, что все выглядит хорошо здесь.
root@4f8171d4fe47:/# aws s3 cp s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test .
download: s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test to ./airflow-test
root@4f8171d4fe47:/# cat airflow-test
put this in s3 fileroot@4f8171d4fe47:/stringer#
Таким образом, кажется, что соединение с воздушным потоком s3 хорошее, за исключением того, что задания воздушного потока не используют s3 для регистрации. Вот мои настройки, которые я считаю, что-то не так или я что-то упускаю.
Env различные запущенные экземпляры рабочий / планировщик / мастер
airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep -i s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
S3_BUCKET=vevo-dev-us-east-1-services-airflow
Это показывает, что соединение s3_logs существует в потоке воздуха
airflow@airflow-worker-847c66d478-lbcn2:~$ airflow connections -l|grep s3
│ 's3_logs' │ 's3' │ 'vevo-dev-
us-...vices-airflow' │ None │ False │ False │ None │
Я поместил этот файл https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py в свое изображение докера. Вы можете увидеть пример здесь на одном из наших работников
airflow@airflow-worker-847c66d478-lbcn2:~$ ls -al /usr/local/airflow/config/
total 32
drwxr-xr-x. 2 root root 4096 Feb 23 00:39 .
drwxr-xr-x. 1 airflow airflow 4096 Feb 23 00:53 ..
-rw-r--r--. 1 root root 4471 Feb 23 00:25 airflow_local_settings.py
-rw-r--r--. 1 root root 0 Feb 16 21:35 __init__.py
Мы отредактировали файл, чтобы определить переменную REMOTE_BASE_LOG_FOLDER. Вот разница между нашей версией и вышестоящей версией
index 899e815..897d2fd 100644
--- a/var/tmp/file
+++ b/config/airflow_local_settings.py
@@ -35,7 +35,8 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
-REMOTE_BASE_LOG_FOLDER = ''
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'remote_base_log_folder')
+
DEFAULT_LOGGING_CONFIG = {
'version': 1,
Здесь вы можете видеть, что на одном из наших работников установлены правильные настройки.
>>> import airflow
>>> airflow.conf.get('core', 'remote_base_log_folder')
's3://vevo-dev-us-east-1-services-airflow/logs/'
На основании того факта, что REMOTE_BASE_LOG_FOLDER начинается с 's3', а REMOTE_LOGGING имеет значение True
>>> airflow.conf.get('core', 'remote_logging')
'True'
Я ожидаю, что этот блок https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py оценивается как true и заставляет журналы переходить на s3.
Пожалуйста, может кто-нибудь, у кого логирование s3 работает на 1.9, указать, что мне не хватает? Я хотел бы представить PR в вышестоящий проект для обновления документации, так как это кажется довольно распространенной проблемой, и насколько я могу судить, вышестоящие документы не действительны или как-то часто неправильно интерпретируются.
Спасибо! Г.
1 ответ
Да, у меня также были проблемы с настройкой только на основе документов. Я должен был пройти код воздушного потока, чтобы понять это. Есть несколько вещей, которые вы не могли бы сделать.
Некоторые вещи, чтобы проверить:
1. Убедитесь, что у вас есть файл log_config.py, и он находится в правильном каталоге: ./config/log_config.py. Также убедитесь, что вы не забыли файл __init__.py в этом каталоге.
2. Убедитесь, что вы определили обработчик s3.task и установите для его форматера значение airflow.task
3. Убедитесь, что для обработчиков airflow.task и airflow.task_runner установлено значение s3.task.
Вот файл log_config.py, который работает для меня:
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from airflow import configuration as conf
# TO DO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
S3_LOG_FOLDER = 's3://your_path_to_airflow_logs'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
# 'gcs.task': {
# 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
# 'formatter': 'airflow.task',
# 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
# 'gcs_log_folder': GCS_LOG_FOLDER,
# 'filename_template': FILENAME_TEMPLATE,
# },
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
При развертывании на k8 с официальной диаграммой управления мне пришлось добавить конфигурацию удаленного ведения журнала также в рабочие модули. Так что этого было недостаточно:
AIRFLOW__CORE__REMOTE_LOGGING: True
AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'
Мне тоже пришлось передать эти вары рабочим
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOGGING: True
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'