Как использовать фоновые задачи со Starlette, когда нет фонового объекта?

Я надеюсь, что сейчас не буду использовать сельдерей. В документации Starlette они предоставляют два способа добавления фоновых задач:

Через Graphene: https://www.starlette.io/graphql/

class Query(graphene.ObjectType):
    user_agent = graphene.String()

    def resolve_user_agent(self, info):
        """
        Return the User-Agent of the incoming request.
        """
        user_agent = request.headers.get("User-Agent", "<unknown>")
        background = info.context["background"]
        background.add_task(log_user_agent, user_agent=user_agent)
        return user_agent

Через ответ JSON: https://www.starlette.io/background/

async def signup(request):
    data = await request.json()
    username = data['username']
    email = data['email']
    task = BackgroundTask(send_welcome_email, to_address=email)
    message = {'status': 'Signup successful'}
    return JSONResponse(message, background=task)

Кто-нибудь знает, как добавить задачи в фон Старлетты с Ариадной? Я не могу вернуть ответ JSONResponse в моем преобразователе, и у меня нет доступа к info.context["background"]. Единственное, что я привязал к своему контексту, - это объект запроса.

2 ответа

Решение

Решено!

ПО промежуточного слоя Starlette:

class BackgroundTaskMiddleware(BaseHTTPMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        request.state.background = None
        response = await call_next(request)
        if request.state.background:
            response.background = request.state.background
        return response

Резольвер Ариадны:

@query.field("getUser")
@check_authentication
async def resolve_get_user(user, obj, info):
    task = BackgroundTasks()
    task.add_task(test_func)
    task.add_task(testing_func_two, "I work now")
    request = info.context["request"]
    request.state.background = task
    return True


async def test_func():
    await asyncio.sleep(10)
    print("once!!")


async def testing_func_two(message: str):
    print(message)

Функции по-прежнему выполняются синхронно, но, поскольку они являются фоновыми задачами, меня это не особо беспокоит.

Больше обсуждения здесь.

Вышеприведенное, помеченное как решение, не работает для меня, поскольку BackgroundTask не работает должным образом, когда вы используете промежуточное программное обеспечение, которое подклассы BaseHTTPMiddleware см. здесь:

https://github.com/encode/starlette/issues/919

В моем случае в основном задача не запускается в фоновом режиме и ожидается ее завершение, также я не использую Ариадну, но это должно позволить вам выполнять работу и запускать задачу в фоновом режиме.

Изменить: это сработало для меня.

      executor = ProcessPoolExecutor()

main.executor.submit(
bg_process_funcs,
export_file_format,
export_headers,
data,
alert_type,
floor_subtitle,
date_subtitle,
pref_datetime,
pref_timezone,
export_file_name,
export_limit,)

executor.shutdown()
logger.info("Process Pool Shutdown")
Другие вопросы по тегам