Airflow2 с AWS RDS Postgres в качестве ошибки хранилища метаданных
Здравствуйте, разработчики, я изо всех сил пытался понять, как я могу использовать AWS RDS в качестве мета-магазина воздушного потока
я создаю проблему после просмотра нескольких сообщений и блогов
Рекомендации :
- https://github.com/puckel/docker-airflow/issues/149
- https://github.com/puckel/docker-airflow/pull/253
- https://www.youtube.com/watch?v=aTaytcxy2Ck&t=499s
Я пробовал использовать Postgres версии 9.6.17-R1, а также версию 12
Вот файл Docker COmpose, который я использую
version: '3'
x-airflow-common:
&airflow-common
image: apache/airflow:2.0.1
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: 'postgresql+psycopg2://<USERNAME>:<PASSWORD>@XXXX:5432/airflow-metastore'
# AIRFLOW__CELERY__RESULT_BACKEND: 'db+postgresql://<USERNAME>:<PASSWORD>@XXXX:5432/airflow-metastore'
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
# POSTGRES_USER : XX
# POSTGRES_PASSWORD : XX
# POSTGRES_HOST : XXX
# export POSTGRES_PORT : XX
# POSTGRES_DB: airflow-metastore
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
command: pip3 install -r requirements.txt
depends_on:
redis:
condition: service_healthy
# postgres:
# condition: service_healthy
services:
# postgres:
# image: postgres:13
# environment:
# POSTGRES_USER: airflow
# POSTGRES_PASSWORD: airflow
# POSTGRES_DB: airflow
# healthcheck:
# test: ["CMD", "pg_isready", "-U", "airflow"]
# interval: 5s
# retries: 5
# restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
Вот что я пробовал
docker-compose up airflow-init выдает ошибку
[2021-03-21 14:22:24,386] {db.py:674} INFO - Creating tables
airflow-init_1 | Traceback (most recent call last):
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2336, in _wrap_pool_connect
airflow-init_1 | return fn()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 304, in unique_connection
airflow-init_1 | return _ConnectionFairy._checkout(self)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 778, in _checkout
airflow-init_1 | fairy = _ConnectionRecord.checkout(pool)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 495, in checkout
airflow-init_1 | rec = pool._do_get()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 140, in _do_get
airflow-init_1 | self._dec_overflow()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
airflow-init_1 | with_traceback=exc_tb,
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
airflow-init_1 | raise exception
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 137, in _do_get
airflow-init_1 | return self._create_connection()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 309, in _create_connection
airflow-init_1 | return _ConnectionRecord(self)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 440, in __init__
airflow-init_1 | self.__connect(first_connect_check=True)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 661, in __connect
airflow-init_1 | pool.logger.debug("Error on connect(): %s", e)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
airflow-init_1 | with_traceback=exc_tb,
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
airflow-init_1 | raise exception
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 656, in __connect
airflow-init_1 | connection = pool._invoke_creator(self)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
airflow-init_1 | return dialect.connect(*cargs, **cparams)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 508, in connect
airflow-init_1 | return self.dbapi.connect(*cargs, **cparams)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/psycopg2/__init__.py", line 127, in connect
airflow-init_1 | conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
airflow-init_1 | psycopg2.OperationalError: FATAL: database "airflow-metastore" does not exist
airflow-init_1 |
airflow-init_1 |
airflow-init_1 | The above exception was the direct cause of the following exception:
airflow-init_1 |
airflow-init_1 | Traceback (most recent call last):
airflow-init_1 | File "/home/airflow/.local/bin/airflow", line 8, in <module>
airflow-init_1 | sys.exit(main())
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
airflow-init_1 | args.func(args)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
airflow-init_1 | return func(*args, **kwargs)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
airflow-init_1 | return f(*args, **kwargs)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/db_command.py", line 48, in upgradedb
airflow-init_1 | db.upgradedb()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/db.py", line 684, in upgradedb
airflow-init_1 | command.upgrade(config, 'heads')
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/alembic/command.py", line 294, in upgrade
airflow-init_1 | script.run_env()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/alembic/script/base.py", line 481, in run_env
airflow-init_1 | util.load_python_file(self.dir, "env.py")
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/alembic/util/pyfiles.py", line 97, in load_python_file
airflow-init_1 | module = load_module_py(module_id, path)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/alembic/util/compat.py", line 182, in load_module_py
airflow-init_1 | spec.loader.exec_module(module)
airflow-init_1 | File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow-init_1 | File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/airflow/migrations/env.py", line 108, in <module>
airflow-init_1 | run_migrations_online()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/airflow/migrations/env.py", line 91, in run_migrations_online
airflow-init_1 | with connectable.connect() as connection:
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2263, in connect
airflow-init_1 | return self._connection_cls(self, **kwargs)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 104, in __init__
airflow-init_1 | else engine.raw_connection()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2370, in raw_connection
airflow-init_1 | self.pool.unique_connection, _connection
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2340, in _wrap_pool_connect
airflow-init_1 | e, dialect, self
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1584, in _handle_dbapi_exception_noconnection
airflow-init_1 | sqlalchemy_exception, with_traceback=exc_info[2], from_=e
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
airflow-init_1 | raise exception
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2336, in _wrap_pool_connect
airflow-init_1 | return fn()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 304, in unique_connection
airflow-init_1 | return _ConnectionFairy._checkout(self)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 778, in _checkout
airflow-init_1 | fairy = _ConnectionRecord.checkout(pool)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 495, in checkout
airflow-init_1 | rec = pool._do_get()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 140, in _do_get
airflow-init_1 | self._dec_overflow()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
airflow-init_1 | with_traceback=exc_tb,
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
airflow-init_1 | raise exception
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 137, in _do_get
airflow-init_1 | return self._create_connection()
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 309, in _create_connection
airflow-init_1 | return _ConnectionRecord(self)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 440, in __init__
airflow-init_1 | self.__connect(first_connect_check=True)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 661, in __connect
airflow-init_1 | pool.logger.debug("Error on connect(): %s", e)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
airflow-init_1 | with_traceback=exc_tb,
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
airflow-init_1 | raise exception
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 656, in __connect
airflow-init_1 | connection = pool._invoke_creator(self)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
airflow-init_1 | return dialect.connect(*cargs, **cparams)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 508, in connect
airflow-init_1 | return self.dbapi.connect(*cargs, **cparams)
airflow-init_1 | File "/home/airflow/.local/lib/python3.6/site-packages/psycopg2/__init__.py", line 127, in connect
airflow-init_1 | conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
airflow-init_1 | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: database "airflow-metastore" does not exist
airflow-init_1 |
airflow-init_1 | (Background on this error at: http://sqlalche.me/e/13/e3q8)
airflow-init_1 | usage: airflow [-h] GROUP_OR_COMMAND ...
nit_1 | positional arguments:
airflow-init_1 | GROUP_OR_COMMAND
airflow-init_1 |
airflow-init_1 | Groups:
airflow-init_1 | celery Celery components
airflow-init_1 | config View configuration
airflow-init_1 | connections Manage connections
airflow-init_1 | dags Manage DAGs
airflow-init_1 | db Database operations
airflow-init_1 | kubernetes Tools to help run the KubernetesExecutor
airflow-init_1 | pools Manage pools
airflow-init_1 | providers Display providers
airflow-init_1 | roles Manage roles
airflow-init_1 | tasks Manage tasks
airflow-init_1 | users Manage users
airflow-init_1 | variables Manage variables
airflow-init_1 |
airflow-init_1 | Commands:
airflow-init_1 | cheat-sheet Display cheat sheet
airflow-init_1 | info Show information about current Airflow and environment
airflow-init_1 | kerberos Start a kerberos ticket renewer
airflow-init_1 | plugins Dump information about loaded plugins
airflow-init_1 | rotate-fernet-key
airflow-init_1 | Rotate encrypted connection credentials and variables
airflow-init_1 | scheduler Start a scheduler instance
airflow-init_1 | sync-perm Update permissions for existing roles and DAGs
airflow-init_1 | version Show the version
airflow-init_1 | webserver Start a Airflow webserver instance
airflow-init_1 |
airflow-init_1 | optional arguments:
airflow-init_1 | -h, --help show this help message and exit
airflow-init_1 |
airflow-init_1 | airflow command error: argument GROUP_OR_COMMAND: `airflow upgradedb` command, has been removed, please use `airflow db upgrade`, see help above.
airflow-init_1 | 2.0.1
Я много пробовал искать это и не мог найти решения, любая помощь была бы отличной