Проблема с потоком Fernet_Key при попытке запроса базы данных mssql
Я довольно новичок в Airflow. Я несколько раз прочитал документацию, разобрался с многочисленными вопросами S/O и множеством случайных статей в Интернете, но пока не решил эту проблему. У меня ощущение, что это что-то супер простое, я делаю неправильно. У меня есть Docker для Windows, и я вытащил puckel/docker-airflow
image и запустил контейнер с открытыми портами, чтобы я мог подключиться к интерфейсу с моего хоста. У меня работает другой контейнер mcr.microsoft.com/mssql/server
на котором я восстановил образец базы данных WideWorldImporters. Благодаря пользовательскому интерфейсу Airflow я смог успешно создать соединение с этой базой данных и даже запросить его в разделе "Профилирование данных". Проверьте изображения ниже: Создание соединения Успешный запрос к соединению
Таким образом, пока это работает, мой даг не удается на 2-м задании sqlData
, вот код:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime
copyData = DAG(
dag_id='copyData',
schedule_interval='@once',
start_date=datetime(2019,1,1)
)
printHelloBash = BashOperator(
task_id = "print_hello_Bash",
bash_command = 'echo "Lets copy some data"',
dag = copyData
)
mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
task_id="select_some_data",
mssql_conn_id=mssqlConnection,
database="WideWorldImporters",
dag = copyData,
depends_on_past=True
)
queryDataSuccess = BashOperator(
task_id = "confirm_data_queried",
bash_command = 'echo "We queried data!"',
dag = copyData
)
printHelloBash >> sqlData >> queryDataSuccess
Изначально ошибка была:
*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
_fernet = Fernet(fernet_key.encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
key = base64.urlsafe_b64decode(key)
File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
return b64decode(s)
File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*
Я заметил, что это связано с криптографией, и я пошел вперед и побежал pip install cryptography
а также pip install airflow[crytpo]
где оба возвращали одинаковые результаты, сообщая мне, что требование уже выполнено. Наконец, я нашел что-то, что говорит, что мне просто нужно сгенерировать fernet_key. Ключ по умолчанию в моем файле airflow.cfg был fernet_key = $FERNET_KEY
, Итак из кли в контейнере я запустил:
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
И получил код, который я заменил $FERNET_KEY
с. Я перезапустил контейнер и снова запустил dag, и теперь моя ошибка:
[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
h.verify(data[-32:])
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
ctx.verify(signature)
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
Что из первоначального сканирования в криптовалюте связано с совместимостью?
Сейчас я в растерянности и решил, что задам этот вопрос, чтобы понять, могу ли я пойти по неверному пути в решении этого вопроса. Любая помощь будет принята с благодарностью, так как Airflow кажется потрясающим.
1 ответ
Благодаря некоторому побочному общению от @Tomasz я наконец-то получил свой DAG на работу. Он порекомендовал мне попробовать использовать docker-compose, который также указан в репозитории puckel/docker-airflow github. В итоге я использовал файл docker-compose-LocalExecutor.yml вместо Celery Executor. Было небольшое устранение неполадок и больше настроек, которые мне пришлось пройти. Для начала я взял свой существующий контейнер MSSQL, в котором был образец базы данных, и превратил его в изображение, используя docker commit mssql_container_name
, Единственная причина, по которой я это сделал, - это сэкономить время на восстановление резервной копии базы данных; вы всегда можете скопировать резервные копии в контейнер и восстановить их позже, если хотите. Затем я добавил свой новый образ в существующий файл docker-compose-LocalExecutor.yml следующим образом:
version: '2.1'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
mssql:
image: dw:latest
ports:
- "1433:1433"
webserver:
image: puckel/docker-airflow:1.10.2
restart: always
depends_on:
- postgres
- mssql
environment:
- LOAD_EX=n
- EXECUTOR=Local
#volumes:
#- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
Имейте в виду, dw - это то, что я назвал новым образом, основанным на контейнере mssql. Затем я переименовал файл в docker-compose.yml, чтобы я мог легко запустить docker-compose up
(не уверен, есть ли команда, указывающая непосредственно на другой файл YAML). Когда все было готово, я перешел к интерфейсу Airflow и настроил соединение. Примечание: поскольку вы используете docker-compose, вам не нужно знать IP-адрес других контейнеров, поскольку они используют обнаружение службы DNS, о котором я узнал здесь. Затем, чтобы проверить соединение, я пошел в Data Profiling, чтобы выполнить специальный запрос, но соединения там не было. Это связано с тем, что в образе puckel / docker-airflow не установлено pymssql. Так что просто забейте в контейнер docker exec -it airflow_webserver_container bash
и установить его pip install pymssql --user
, Выйдите из контейнера и перезапустите все сервисы, используя docker-compose restart
, Через минуту все заработало. Мое соединение обнаружилось в специальном запросе, и я смог успешно выбрать данные. Наконец, я включил DAG, планировщик поднял его, и все прошло успешно! Супер облегчение после нескольких недель поиска в Google. Спасибо @y2k-shubham за помощь и некоторую огромную благодарность @Tomasz, к которому я первоначально обратился после его удивительного и тщательного поста об Airflow в субреддите r/datascience.