Сохраните и получите доступ к паролю с помощью Apache airflow
Мы используем воздушный поток в качестве планировщика. Я хочу вызвать простой оператор bash в DAG. Сценарию bash необходим пароль в качестве аргумента для дальнейшей обработки.
Как я могу безопасно хранить пароль в airflow (config/variable /connection) и получить к нему доступ в файле определения dag.
Я новичок в Airflow и Python, поэтому фрагмент кода будет оценен.
8 ответов
Вы можете сохранить пароль в Hook - он будет зашифрован, если вы настроили свой ключ фернет.
Вот как вы можете создать соединение.
from airflow.models import Connection
def create_conn(username, password, host=None):
new_conn = Connection(conn_id=f'{username}_connection',
login=username,
host=host if host else None)
new_conn.set_password(password)
Затем этот пароль шифруется в установленной вами базе данных.
Чтобы получить доступ к этому паролю:
from airflow.hooks.base_hook import BaseHook
connection = BaseHook.get_connection("username_connection")
password = connection.password # This is a getter that returns the unencrypted password.
Этот ответ может быть немного запоздалым, но я думаю, что он важен и все еще актуален:
Когда используешь
BaseHook.get_hook(conn_id=conn_id)
учетные данные регистрируются как обычный текст в файле журнала Airflow (мы наблюдали в версии 2.2.3) по пути
/var/log/airflow/scheduler/<date>/<dag>/
Вы точно не хотите, чтобы вы вводили логин и пароль там.
Чтобы этого избежать, используйте
get_connection_from_secrets
как в:
from airflow.models import Connection
Connection.get_connection_from_secrets("<connection>")
При этом никакие учетные данные не регистрируются в файле.
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('bigquery_connection')
print(conn.get_extra())
Эти conn.get_extra()
предоставит вам JSON настроек, хранящихся в соединении.
Вы можете хранить пароль в переменных воздушного потока, https://airflow.incubator.apache.org/ui.html
- Создайте переменную с ключом и значением в пользовательском интерфейсе, например, mypass:XXX
- Переменная импорта
from airflow.models import Variable
- MyPass = Variable.get("mypass")
- Передайте MyPass вашему скрипту bash:
command = """
echo "{{ params.my_param }}"
"""
task = BashOperator(
task_id='templated',
bash_command=command,
params={'my_param': MyPass},
dag=dag)
Используйте графический интерфейс на вкладке администратора / подключений.
Ответ, который действительно работает, с программным сохранением соединения в Airflow, работает, как в приведенном ниже фрагменте.
В приведенном ниже примере myservice
представляет собой некоторый внешний кеш учетных данных.
При использовании подхода, описанного ниже, вы можете хранить свои подключения, которыми вы управляете извне, внутри воздушного потока. Без необходимости опрашивать службу внутри каждого дага / задачи. Вместо этого вы можете полагаться на механизм подключения Airflow, и вам не нужно терять операторов, которые предоставляет Airflow (если ваша организация это разрешает).
Уловка заключается в использовании airflow.utils.db.merge_conn
для обработки настройки созданного вами объекта подключения.
from airflow.utils.db import provide_session, merge_conn
creds = {"user": myservice.get_user(), "pwd": myservice.get_pwd()
c = Connection(conn_id=f'your_airflow_connection_id_here',
login=creds["user"],
host=None)
c.set_password(creds["pwd"])
merge_conn(c)
merge_conn является встроенным и используется самим воздушным потоком для инициализации пустых соединений. Однако оно не будет обновляться автоматически. для этого вам придется использовать вашу собственную вспомогательную функцию.
from airflow.utils.db import provide_session
@provide_session
def store_conn(conn, session=None):
from airflow.models import Connection
if session.query(Connection).filter(Connection.conn_id == conn.conn_id).first():
logging.info("Connection object already exists, attempting to remove it...")
session.delete(session.query(Connection).filter(Connection.conn_id == conn.conn_id).first())
session.add(conn)
session.commit()
В этом случае я бы использовал PythonOperator, из которого вы можете получить Hook
на вашем подключении к базе данных, используяhook = PostgresHook(postgres_conn_id=postgres_conn_id)
, Вы можете позвонить get_connection
на этом хуке, который даст вам объект Connection, из которого вы можете получить хост, логин и пароль для вашего подключения к базе данных.
Наконец, используйте, например, subprocess.call(your_script.sh, connection_string)
передача сведений о соединении в качестве параметра.
Этот метод немного запутан, но он позволяет сохранить шифрование для соединений с базой данных в Airflow. Кроме того, вы должны иметь возможность вытащить эту стратегию в отдельный класс Operator, унаследовав базовое поведение от PythonOperator, но добавив логику для получения хука и вызова сценария bash.
Я написал следующий служебный метод для создания сеанса с конфигурацией внешней базы данных, сохраненной в Airflow:
from airflow.hooks.base_hook import BaseHook
from sqlalchemy.orm.session import sessionmaker
def get_session(conn_id):
dbhook = BaseHook.get_hook(conn_id=conn_id)
engine = create_engine(dbhook.get_uri())
Session = sessionmaker()
session = Session(bind=engine)
return session
Это то, что я использовал.
def add_slack_token(ds, **kwargs):
""""Add a slack token"""
session = settings.Session()
new_conn = Connection(conn_id='slack_token')
new_conn.set_password(SLACK_LEGACY_TOKEN)
if not (session.query(Connection).filter(Connection.conn_id ==
new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
else:
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
msg = msg.format(conn_id=new_conn.conn_id)
print(msg)
dag = DAG(
'add_connections',
default_args=default_args,
schedule_interval="@once")
t2 = PythonOperator(
dag=dag,
task_id='add_slack_token',
python_callable=add_slack_token,
provide_context=True,
)