Использование работника Celery для взаимодействия с базой данных SQLAlchemy, включая знание пользователя по запросу
Я сделал много исследований по этому вопросу, в том числе пытаясь ответить на подобные вопросы. Похоже, что Celery не имеет доступа к контексту моего приложения Flask.
Я прекрасно знаю, что мой объект из сельдерея, который украсит мои задачи, должен иметь доступ к контексту моего приложения Flask. И я верю, что должен, поскольку я следовал этому руководству, чтобы создать свой объект сельдерея. Я не уверен, что путаница кроется где-то в том, что я использую Flask-HTTPAuth.
Вот кое-что из того, что у меня есть.
def make_celery(app):
celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
items = g.user.items
for item in items:
print(item)
Однако запуск этой задачи с использованием Flask не требуется. Я пытаюсь запустить эту функцию, нажав на сервер (пока авторизован!).
@app.route("/item_loop")
@auth.login_required
def item_loop():
result = loop.delay()
return "It's running."
Но работник сельдерея говорит мне задачу raised unexpected: AttributeError("'_AppCtxGlobals' object has no attribute 'user'",)
Как я уже сказал, это подразумевает, что, как уже упоминалось, мой объект сельдерея не имеет контекста приложения, хотя я использовал рекомендованный шаблон фабрики.
3 ответа
Хотя рекомендации в ответах Дейва и Грега действительны, они упускают из виду то, что у вас есть неправильное понимание использования контекста приложения в задаче Celery.
У вас есть приложение Flask, в котором вы используете Flask-HTTPAuth. У вас наверное есть verify_password
обработчик, который устанавливает g.user
аутентифицированному пользователю. Это означает, что во время обработки запроса вы можете получить доступ к пользователю как g.user
, Это все хорошо.
У вас также есть один или несколько сотрудников Celery, которые представляют собой отдельные процессы, которые не имеют прямого подключения к серверу Flask. Единственный обмен данными между сервером Flask и рабочими процессами Celery происходит через используемый вами брокер сообщений (обычно Redis или RabbitMQ).
В зависимости от ваших потребностей работникам Celery может потребоваться доступ к приложению Flask. Это очень распространено при использовании расширений Flask, которые сохраняют свою конфигурацию в app.config
толковый словарь. Два распространенных расширения, которые требуют этого, - Flask-SQLAlchemy и Flask-Mail. Без доступа к app.config
задача Celery не сможет открыть соединение с базой данных или отправить электронное письмо, поскольку она не будет знать детали базы данных и / или почтового сервера.
Чтобы предоставить работникам Celery доступ к конфигурации, принято создавать дубликаты приложений Flask на каждом работнике. Это вторичные приложения, которые никак не связаны с реальным объектом приложения, используемым основным сервером Flask. Их единственная цель состоит в том, чтобы держать копию оригинала app.config
словарь, к которому может получить доступ ваша задача или любые расширения Flask, используемые вашей задачей.
Так что нельзя ожидать, что g.user
набор на сервере Flask будет доступен также как g.user
в задаче Celery, просто потому, что они разные g
объекты из разных экземпляров приложения.
Если вам нужно использовать аутентифицированного пользователя в задаче Celery, вам нужно передать user_id
(обычно g.user.id
) в качестве аргумента к вашей задаче. Затем в вашей задаче вы можете загрузить пользователя из базы данных, используя эту id
, Надеюсь это поможет!
Чтобы извлечь пользователя из выполнения задачи, вы можете попробовать передать объект User (если сельдерей может его засолить) или передать достаточно информации, чтобы задача могла извлечь объект User (например, идентификатор пользователя). В этом последнем случае ваша задача будет выглядеть примерно так
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self, user_id):
user = User.query.get(user_id)
items = user.items
for item in items:
print(item)
и вы начнете (если вы используете flask_login) через
result = loop.delay(current_user.id)
Как отмечает @Dave W. Smith, а не полагаться на g
для извлечения пользователя может быть лучше передать информацию о пользователе в качестве аргумента в задачу Celery. Согласно документации Flask по контексту приложения, время жизни g
это запрос. Поскольку задача Celery выполняется асинхронно, она будет выполняться в другом контексте приложения, чем в запросе, в котором вы определили пользователя.