Как использовать фоновые задачи со Starlette, когда нет фонового объекта?
Я надеюсь, что сейчас не буду использовать сельдерей. В документации Starlette они предоставляют два способа добавления фоновых задач:
Через Graphene: https://www.starlette.io/graphql/
class Query(graphene.ObjectType):
user_agent = graphene.String()
def resolve_user_agent(self, info):
"""
Return the User-Agent of the incoming request.
"""
user_agent = request.headers.get("User-Agent", "<unknown>")
background = info.context["background"]
background.add_task(log_user_agent, user_agent=user_agent)
return user_agent
Через ответ JSON: https://www.starlette.io/background/
async def signup(request):
data = await request.json()
username = data['username']
email = data['email']
task = BackgroundTask(send_welcome_email, to_address=email)
message = {'status': 'Signup successful'}
return JSONResponse(message, background=task)
Кто-нибудь знает, как добавить задачи в фон Старлетты с Ариадной? Я не могу вернуть ответ JSONResponse в моем преобразователе, и у меня нет доступа к info.context["background"]. Единственное, что я привязал к своему контексту, - это объект запроса.
2 ответа
Решено!
ПО промежуточного слоя Starlette:
class BackgroundTaskMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
request.state.background = None
response = await call_next(request)
if request.state.background:
response.background = request.state.background
return response
Резольвер Ариадны:
@query.field("getUser")
@check_authentication
async def resolve_get_user(user, obj, info):
task = BackgroundTasks()
task.add_task(test_func)
task.add_task(testing_func_two, "I work now")
request = info.context["request"]
request.state.background = task
return True
async def test_func():
await asyncio.sleep(10)
print("once!!")
async def testing_func_two(message: str):
print(message)
Функции по-прежнему выполняются синхронно, но, поскольку они являются фоновыми задачами, меня это не особо беспокоит.
Вышеприведенное, помеченное как решение, не работает для меня, поскольку BackgroundTask не работает должным образом, когда вы используете промежуточное программное обеспечение, которое подклассы BaseHTTPMiddleware см. здесь:
https://github.com/encode/starlette/issues/919
В моем случае в основном задача не запускается в фоновом режиме и ожидается ее завершение, также я не использую Ариадну, но это должно позволить вам выполнять работу и запускать задачу в фоновом режиме.
Изменить: это сработало для меня.
executor = ProcessPoolExecutor()
main.executor.submit(
bg_process_funcs,
export_file_format,
export_headers,
data,
alert_type,
floor_subtitle,
date_subtitle,
pref_datetime,
pref_timezone,
export_file_name,
export_limit,)
executor.shutdown()
logger.info("Process Pool Shutdown")