Подключение Airflow S3 с помощью пользовательского интерфейса

Я пытался использовать Airflow для планирования DAG. Один из DAG включает в себя задачу, которая загружает данные из корзины s3.

Для вышеуказанной цели мне нужно настроить соединение s3. Но интерфейс, предоставляемый airflow, не настолько интуитивен ( http://pythonhosted.org/airflow/configuration.html?highlight=connection). Кто-нибудь преуспел в настройке соединения s3, если да, то есть ли какие-нибудь лучшие практики, которым вы, ребята, следовали?

Благодарю.

7 ответов

Решение

Трудно найти ссылки, но, немного покопавшись, я смог заставить его работать.

TLDR

Создайте новое соединение со следующими атрибутами:

Conn Id: my_conn_S3

Тип соединения: S3

Дополнительно:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}

Длинная версия, настройка UI-соединения:

  • В интерфейсе Airflow выберите "Администрирование"> "Подключения".
  • Создайте новое соединение со следующими атрибутами: Идентификатор соединения: my_conn_S3, Тип соединения: S3, Дополнительно: {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
  • Оставьте все остальные поля (Host, Schema, Login) пустыми.

Чтобы использовать это соединение, ниже вы можете найти простой тест датчика S3. Идея этого теста состоит в том, чтобы настроить датчик, который просматривает файлы на этапе S3 (задача T1), и, как только выполняется условие, указанное ниже, он запускает команду bash (задача T2).

тестирование

  • Перед запуском группы обеспечения доступности баз данных убедитесь, что у вас есть корзина S3 с именем "S3-Bucket-To-Watch".
  • Добавьте ниже s3_dag_test.py в папку с воздушными потоками (~/airflow/dags)
  • Начните airflow webserver,
  • Перейти к интерфейсу Airflow ( http://localhost:8383/)
  • Начните airflow scheduler,
  • Включите группу DAG 's3_dag_test' в главном представлении группы DAG.
  • Выберите "s3_dag_test", чтобы показать подробности о dag.
  • На графике вы должны увидеть его текущее состояние.
  • Задача 'check_s3_for_file_in_s3' должна быть активной и запущенной.
  • Теперь добавьте файл с именем "file-to-watch-1" к вашему "S3-Bucket-To-Watch".
  • Первые задачи должны быть выполнены, вторые должны быть начаты и завершены.

Schedule_interval в определении dag установлен на '@once', чтобы облегчить отладку.

Чтобы запустить его снова, оставьте все как есть, удалите файлы из корзины и попробуйте еще раз, выбрав первую задачу (в виде графика) и выбрав "Очистить" все "Прошлое", "Будущее", "Вверх по течению", "Вниз по течению".... деятельность. Это должно снова запустить DAG.

Дайте мне знать, как все прошло.

s3_dag_test.py;

"""
S3 Sensor Connection Test
"""

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 1),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')

t1 = BashOperator(
    task_id='bash_test',
    bash_command='echo "hello, it should work" > s3_conn_test.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='S3-Bucket-To-Watch',
    s3_conn_id='my_conn_S3',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)

Основные ссылки:

Предполагая, что поток воздуха размещен на сервере EC2.

просто создайте соединение согласно другим ответам, но оставьте все пустым в конфигурации, кроме типа соединения, который должен оставаться как S3

S3hook по умолчанию будет boto, и это будет по умолчанию роль сервера EC2, на котором вы выполняете воздушный поток. при условии, что эта роль имеет права на S3, ваша задача сможет получить доступ к корзине.

это гораздо более безопасный способ, чем использование и хранение учетных данных.

Если вы беспокоитесь о предоставлении учетных данных в пользовательском интерфейсе, другой способ - передать местоположение файла учетных данных в дополнительном параметре в пользовательском интерфейсе. Только функциональный пользователь имеет права на чтение файла. Это выглядит примерно так

Extra:  {
    "profile": "<profile_name>", 
    "s3_config_file": "/home/<functional_user>/creds/s3_credentials", 
    "s3_config_format": "aws" }

файл "/home/<functional_user>/creds/s3_credentials"имеет ниже записи

[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>

Другой вариант, который работал для меня, заключался в том, чтобы ввести ключ доступа в качестве "логина" и секретный ключ в качестве "пароля":

Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>

Оставьте все остальные поля пустыми.

Несколько версий назад мы добавили это в нашу документацию:

http://airflow.apache.org/docs/stable/howto/connection/aws.html

Нет разницы между подключением AWS и подключением S3.

Принятый ответ здесь имеет ключ и секрет в дополнительном /JSON, и, хотя он все еще работает (с 1.10.10), его больше не рекомендуется использовать, поскольку он отображает секрет в виде обычного текста в пользовательском интерфейсе.

Для новой версии измените код Python на примере выше.

s3_conn_id='my_conn_S3'

в

aws_conn_id='my_conn_s3'
Conn Id: example_s3_connnection
Conn Type: S3
Extra:{"aws_access_key_id":"xxxxxxxxxx", "aws_secret_access_key": "yyyyyyyyyyy"}

Примечание: поля Логин и Пароль оставлены пустыми.

Для aws в Китае, он не работает с потоком воздуха ==1.8.0 нужно обновить до 1.9.0, но airflow 1.9.0 смените имя на apache-airflow==1.9.0

Другие вопросы по тегам