Настройка s3 для логов в потоке воздуха

Я использую docker-compose для настройки масштабируемого кластера воздушного потока. Я основал свой подход на этом Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

Моя проблема в том, чтобы настроить логи для записи / чтения с s3. Когда dag закончен, я получаю ошибку как это

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

Я создал новый раздел в airflow.cfg файл как это

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

А затем указал путь s3 в разделе удаленных журналов в airflow.cfg

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

Я правильно настроил, и есть ошибка? Есть ли здесь рецепт успеха, которого мне не хватает?

-- Обновить

Я пытался экспортировать в форматы URI и JSON, но ни один из них не работал. Затем я экспортировал aws_access_key_id и aws_secret_access_key, а затем поток воздуха начал подниматься. Теперь я получаю свою ошибку в рабочих логах

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

-- Обновить

Я также нашел эту ссылку https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

Затем я зашел на одну из моих рабочих машин (отдельно от веб-сервера и планировщика) и запустил этот фрагмент кода на python

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

Я получаю эту ошибку.

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

Я пытался экспортировать несколько разных типов AIRFLOW_CONN_ envs, как описано здесь, в разделе соединений https://airflow.incubator.apache.org/concepts.html и другими ответами на этот вопрос.

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

Я также экспортировал AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY безуспешно.

Эти учетные данные хранятся в базе данных, поэтому, как только я добавлю их в пользовательский интерфейс, они должны быть подобраны рабочими, но по какой-то причине они не могут писать / читать журналы.

8 ответов

Решение

Вам необходимо настроить соединение s3 через интерфейс воздушного потока. Для этого вам нужно перейти на вкладку Admin -> Connections в интерфейсе airflow и создать новую строку для вашего S3-соединения.

Пример конфигурации будет:

Conn Id: my_conn_S3

Тип соединения: S3

Дополнительно: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

ОБНОВЛЕНИЕ Airflow 1.10 делает регистрацию намного проще.

Для ведения журнала s3, установите хук соединения согласно ответу выше

а затем просто добавьте следующее в airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

Для регистрации gcs,

  1. Сначала установите пакет gcp_api, например, так: pip install apache-airflow[gcp_api].

  2. Установите хук соединения согласно ответу выше

  3. Добавьте следующее в airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn
    

ПРИМЕЧАНИЕ. По состоянию на Airflow 1.9 удаленное ведение журнала значительно изменилось. Если вы используете 1.9, читайте дальше.

Ссылка здесь

Полные инструкции:

  1. Создайте каталог для хранения конфигов и разместите его так, чтобы его можно было найти в PYTHONPATH. Одним из примеров является $AIRFLOW_HOME/config

  2. Создайте пустые файлы с именами $AIRFLOW_HOME/config/log_config.py и $AIRFLOW_HOME/config/__init__.py

  3. Скопируйте содержимое https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/config_templates/airflow_local_settings.py в файл log_config.py, который был только что создан на шаге выше.

  4. Настройте следующие части шаблона:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. Убедитесь, что в воздушном потоке был определен соединительный крючок s3 согласно ответу выше. Хук должен иметь доступ на чтение и запись к корзине s3, определенной выше в S3_LOG_FOLDER.

  6. Обновите $AIRFLOW_HOME/airflow.cfg, чтобы он содержал:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
  7. Перезапустите веб-сервер и планировщик Airflow и запустите (или дождитесь) выполнение новой задачи.

  8. Убедитесь, что журналы отображаются для недавно выполненных задач в выбранной вами корзине.

  9. Убедитесь, что средство просмотра хранилища s3 работает в пользовательском интерфейсе. Подтяните вновь выполненную задачу и убедитесь, что вы видите что-то вроде:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    

Вот решение, если вы не хотите использовать интерфейс администратора.

Мой процесс развертывания Dockerized, и я никогда не касаюсь интерфейса администратора. Мне также нравится задавать переменные среды, специфичные для Airflow, в скрипте bash, который переопределяет файл.cfg.

расход воздуха [s3]

Прежде всего, вам нужно s3 установлен подпакет для записи ваших логов Airflow на S3. (boto3 отлично работает для заданий Python в ваших DAG, но S3Hook зависит от подпакета s3.)

Еще одно замечание: установка conda еще не справилась с этим, поэтому я должен сделать pip install airflow[s3],

Переменные среды

В скрипте bash я установил эти core переменные. Начиная с этих инструкций, но используя соглашение об именах AIRFLOW__{SECTION}__{KEY} для переменных среды я делаю:

export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

S3 ID соединения

s3_uri это идентификатор соединения, который я составил. В Airflow это соответствует другой переменной среды, AIRFLOW_CONN_S3_URI, Значение этого - ваш путь S3, который должен быть в форме URI. Это

s3://access_key:secret_key@bucket/key

Сохраните это, однако, вы обрабатываете другие конфиденциальные переменные среды

С этой конфигурацией Airflow запишет ваши логи на S3. Они пойдут по пути s3://bucket/key/dag/task_id,

Чтобы завершить ответ Арне с последними обновлениями Airflow, вам не нужно устанавливать task_log_reader на другое значение, отличное от значения по умолчанию: task

Как если бы вы следовали шаблону регистрации по умолчанию https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py вы можете увидеть после этой фиксации (обратите внимание, что имя обработчика изменилось на's3': {'task'... вместо s3.task) это значение в удаленной папке (REMOTE_BASE_LOG_FOLDER) заменит обработчик на правильный:

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

Подробнее о том, как войти в систему / прочитать с S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

Фух! Мотивация продолжать пресекать ошибки воздушного потока в зародыше состоит в том, чтобы противостоять этому как кучке файлов python XD, вот мой опыт с apache-airflow==1.9.0

Во-первых, просто незачем пытаться airflow connections .......... --conn_extra и т. д. и т. д.

Просто установите свой airflow.cfg как:

remote_logging = True
remote_base_log_folder = s3://dev-s3-main-ew2-dmg-immutable-potns/logs/airflow-logs/
encrypt_s3_logs = False

# Logging level
logging_level = INFO
fab_logging_level = WARN

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = s3://<ACCESS-KEY>:<SECRET-ID>@<MY-S3-BUCKET>/<MY>/<SUB>/<FOLDER>/

сохраните файлы $AIRFLOW_HOME/config/__ init __.py и $AIRFLOW_HOME/config/log_config.py, как указано выше.

Проблема со мной в отсутствующем пакете "boto3", который я мог решить:

vi /usr/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py затем >> импортировать трассировку и в строке, содержащей:

Не удалось создать S3Hook с идентификатором соединения "%s". ' ' Убедитесь, что воздушный поток [s3] установлен и '' соединение S3 существует.

выполнив traceback.print_exc(), и он начал прикидываться по поводу отсутствия boto3!

Установил, и жизнь снова стала прекрасной!

Для airflow 2.3.4, используя Docker, я также столкнулся с проблемами ведения журнала на s3.

Сначала я столкнулся с некоторыми ошибками прав доступа (хотя моя роль IAM была настроена нормально), затем, немного изменив конфигурацию, я смог записать файлы в правильное место, но не смог прочитать (возврат к локальному журналу).

В любом случае, после многих усилий, отладки, проб и ошибок, вот что у меня сработало:

Определите соединение для s3 (при условии, что ваш регион такжеeu-west-1):

Либо через пользовательский интерфейс, и в этом случае вам нужно установить:

  • Идентификатор соединения:my-conn(или любое другое имя, которое вы предпочитаете),
  • Тип соединения:Amazon Web Services(это одно изменение, которое я сделал,s3мне не помогло)
  • Дополнительный:{"region_name": "eu-west-1", "endpoint_url": "https://s3.eu-west-1.amazonaws.com"}

Или через CLI:

      airflow connections add my-conn --conn-type aws --conn-extra '{"region_name": "eu-west-1", "endpoint_url": "https://s3.eu-west-1.amazonaws.com"}'

Что касается конфигурации воздушного потока, я установил ее во всех процессах:

      ...
export AIRFLOW__LOGGING__REMOTE_LOGGING=True
export AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=s3://my-bucket/path/to/log/folder
export AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=my-conn
...

После развертывания я все еще получал такие ошибки, какFalling back to local log..., но в итоге файл загрузился и отобразился (после нескольких обновлений).

Хотя сейчас вроде работает нормально :)

Пусть он работает с Airflow 10 в кубе. У меня есть следующие наборы env var:

AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3

Просто примечание для тех, кто следит за очень полезными инструкциями в ответе выше: Если вы наткнулись на эту проблему: "ModuleNotFoundError: Нет модуля с именем"airflow.utils.log.logging_mixin.RedirectStdHandler "", на который ссылаются здесь (что происходит при использовании airflow 1.9), исправить это просто - используйте скорее этот базовый шаблон: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (и следуйте всем остальным инструкциям в ответ выше)

Текущий шаблон https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py, присутствующий в основной ветке, содержит ссылку на класс "airflow.utils.log.s3_task_handler.S3TaskHandler", которого нет в apache-airflow==1.9.0 python пакет. Надеюсь это поможет!

Другие вопросы по тегам