Подключение Airflow S3 с помощью пользовательского интерфейса
Я пытался использовать Airflow для планирования DAG. Один из DAG включает в себя задачу, которая загружает данные из корзины s3.
Для вышеуказанной цели мне нужно настроить соединение s3. Но интерфейс, предоставляемый airflow, не настолько интуитивен ( http://pythonhosted.org/airflow/configuration.html?highlight=connection). Кто-нибудь преуспел в настройке соединения s3, если да, то есть ли какие-нибудь лучшие практики, которым вы, ребята, следовали?
Благодарю.
7 ответов
Трудно найти ссылки, но, немного покопавшись, я смог заставить его работать.
TLDR
Создайте новое соединение со следующими атрибутами:
Conn Id: my_conn_S3
Тип соединения: S3
Дополнительно:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
Длинная версия, настройка UI-соединения:
- В интерфейсе Airflow выберите "Администрирование"> "Подключения".
- Создайте новое соединение со следующими атрибутами: Идентификатор соединения: my_conn_S3, Тип соединения: S3, Дополнительно: {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
- Оставьте все остальные поля (Host, Schema, Login) пустыми.
Чтобы использовать это соединение, ниже вы можете найти простой тест датчика S3. Идея этого теста состоит в том, чтобы настроить датчик, который просматривает файлы на этапе S3 (задача T1), и, как только выполняется условие, указанное ниже, он запускает команду bash (задача T2).
тестирование
- Перед запуском группы обеспечения доступности баз данных убедитесь, что у вас есть корзина S3 с именем "S3-Bucket-To-Watch".
- Добавьте ниже s3_dag_test.py в папку с воздушными потоками (~/airflow/dags)
- Начните
airflow webserver
, - Перейти к интерфейсу Airflow ( http://localhost:8383/)
- Начните
airflow scheduler
, - Включите группу DAG 's3_dag_test' в главном представлении группы DAG.
- Выберите "s3_dag_test", чтобы показать подробности о dag.
- На графике вы должны увидеть его текущее состояние.
- Задача 'check_s3_for_file_in_s3' должна быть активной и запущенной.
- Теперь добавьте файл с именем "file-to-watch-1" к вашему "S3-Bucket-To-Watch".
- Первые задачи должны быть выполнены, вторые должны быть начаты и завершены.
Schedule_interval в определении dag установлен на '@once', чтобы облегчить отладку.
Чтобы запустить его снова, оставьте все как есть, удалите файлы из корзины и попробуйте еще раз, выбрав первую задачу (в виде графика) и выбрав "Очистить" все "Прошлое", "Будущее", "Вверх по течению", "Вниз по течению".... деятельность. Это должно снова запустить DAG.
Дайте мне знать, как все прошло.
s3_dag_test.py;
"""
S3 Sensor Connection Test
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 1),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='bash_test',
bash_command='echo "hello, it should work" > s3_conn_test.txt',
dag=dag)
sensor = S3KeySensor(
task_id='check_s3_for_file_in_s3',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='S3-Bucket-To-Watch',
s3_conn_id='my_conn_S3',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
Основные ссылки:
Предполагая, что поток воздуха размещен на сервере EC2.
просто создайте соединение согласно другим ответам, но оставьте все пустым в конфигурации, кроме типа соединения, который должен оставаться как S3
S3hook по умолчанию будет boto, и это будет по умолчанию роль сервера EC2, на котором вы выполняете воздушный поток. при условии, что эта роль имеет права на S3, ваша задача сможет получить доступ к корзине.
это гораздо более безопасный способ, чем использование и хранение учетных данных.
Если вы беспокоитесь о предоставлении учетных данных в пользовательском интерфейсе, другой способ - передать местоположение файла учетных данных в дополнительном параметре в пользовательском интерфейсе. Только функциональный пользователь имеет права на чтение файла. Это выглядит примерно так
Extra: {
"profile": "<profile_name>",
"s3_config_file": "/home/<functional_user>/creds/s3_credentials",
"s3_config_format": "aws" }
файл "/home/<functional_user>/creds/s3_credentials
"имеет ниже записи
[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>
Другой вариант, который работал для меня, заключался в том, чтобы ввести ключ доступа в качестве "логина" и секретный ключ в качестве "пароля":
Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>
Оставьте все остальные поля пустыми.
Несколько версий назад мы добавили это в нашу документацию:
http://airflow.apache.org/docs/stable/howto/connection/aws.html
Нет разницы между подключением AWS и подключением S3.
Принятый ответ здесь имеет ключ и секрет в дополнительном /JSON, и, хотя он все еще работает (с 1.10.10), его больше не рекомендуется использовать, поскольку он отображает секрет в виде обычного текста в пользовательском интерфейсе.
Для новой версии измените код Python на примере выше.
s3_conn_id='my_conn_S3'
в
aws_conn_id='my_conn_s3'
Conn Id: example_s3_connnection
Conn Type: S3
Extra:{"aws_access_key_id":"xxxxxxxxxx", "aws_secret_access_key": "yyyyyyyyyyy"}
Примечание: поля Логин и Пароль оставлены пустыми.
Для aws в Китае, он не работает с потоком воздуха ==1.8.0 нужно обновить до 1.9.0, но airflow 1.9.0 смените имя на apache-airflow==1.9.0