asyncio.create_task с Motorclient.aggregate не дожидается завершения агрегирования

Я пытаюсь запустить несколько конвейеров агрегации параллельно и асинхронно, используя метод asyncio.create_task(). Вот мой код:

          tasks = [asyncio.create_task(self.db_client.aggregate(db_name=self.db_name,
                                                          collection_name=collection,
                                                          pipeline=batch, allow_disk_use=True))
             for batch in batches]

    # Run the tasks concurrently and wait for the results
    self.logger.info("Starting %s aggregation task(s)", len(tasks))
    done, pending = await asyncio.wait(set(tasks),
                                       timeout=1800,
                                       return_when="ALL_COMPLETED")

В этом коде происходит то, что агрегатная функция немедленно отправляет MotorCursor, не дожидаясь завершения агрегирования, и такое поведение ожидается в соответствии с их документацией. Однако из-за такого поведения asyncio.wait не ждет, поскольку считает, что задачи выполнены. Так что на самом деле никакого ожидания не происходит. Может ли кто-нибудь дать несколько указаний о том, как заставить задачу asyncio ждать завершения операции MotorClient.aggregate? Не уверен, возможно ли это даже в асинхронной реализации.

0 ответов

Другие вопросы по тегам