asyncio.create_task с Motorclient.aggregate не дожидается завершения агрегирования
Я пытаюсь запустить несколько конвейеров агрегации параллельно и асинхронно, используя метод asyncio.create_task(). Вот мой код:
tasks = [asyncio.create_task(self.db_client.aggregate(db_name=self.db_name,
collection_name=collection,
pipeline=batch, allow_disk_use=True))
for batch in batches]
# Run the tasks concurrently and wait for the results
self.logger.info("Starting %s aggregation task(s)", len(tasks))
done, pending = await asyncio.wait(set(tasks),
timeout=1800,
return_when="ALL_COMPLETED")
В этом коде происходит то, что агрегатная функция немедленно отправляет MotorCursor, не дожидаясь завершения агрегирования, и такое поведение ожидается в соответствии с их документацией. Однако из-за такого поведения asyncio.wait не ждет, поскольку считает, что задачи выполнены. Так что на самом деле никакого ожидания не происходит. Может ли кто-нибудь дать несколько указаний о том, как заставить задачу asyncio ждать завершения операции MotorClient.aggregate? Не уверен, возможно ли это даже в асинхронной реализации.