RuntimeError: Задача присоединена к другому циклу
Привет, я использую AsyncIOMotorClient для асинхронных вызовов БД в mongoDb. Ниже мой код.
xyz.py
async def insertMany(self,collection_name,documents_to_insert):
try:
collection=self.database[collection_name]
document_inserted = await collection.insert_many(documents_to_insert)
return document_inserted
except Exception:
raise
def insertManyFn(self,collection_name,documents_to_insert):
try:
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop1=asyncio.get_event_loop()
inserted_documents_count = loop1.run_until_complete(self.insertMany(collection_name, documents_to_insert))
if inserted_documents_count==len(documents_to_insert):
document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
except Exception:
raise
xyz1.py
t=Timer(10,xyz.insertManyFn,\
(collection_name,documents_to_insert))
t.start()
Во время выполнения этого я получаю исключение
RuntimeError: Task <Task pending coro=<xyz.insertMany() running at <my workspace location>/xyz.py:144> cb=[_run_until_complete_cb() at /usr/lib64/python3.5/asyncio/base_events.py:164]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib64/python3.5/asyncio/futures.py:431]> attached to a different loop
В приведенной выше программе insertManyFn будет вызываться через 10 секунд и выполнять операцию вставки. Но когда он делает первый вызов insertMany, я получаю исключение.
4 ответа
Я все еще хочу, чтобы мой MotorClient находился на верхнем уровне модуля, поэтому я делаю следующее: я исправляю
MotorClient.get_io_loop
чтобы всегда возвращать текущий цикл.
import asyncio
import motor.core
from motor.motor_asyncio import (
AsyncIOMotorClient as MotorClient,
)
# MongoDB client
client = MotorClient('mongodb://localhost:27017/test')
client.get_io_loop = asyncio.get_running_loop
# The current database ("test")
db = client.get_default_database()
# async context
async def main():
posts = db.posts
await posts.insert_one({'title': 'great success!')
# Run main()
asyncio.run(main())
Согласно документации, AsyncIOMotorClient
должен быть передан Ioloop, если вы не используете по умолчанию. Попробуйте создать клиент после создания цикла событий:
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = AsyncIOMotorClient(io_loop=loop)
Ответ от @kolypto спасает мою ночь, но если вы хотите пропатчить какой-то фреймворк или код ORM поверх клиента Motor, вам нужно пропатчить клиент, в моем случае я используюfastapi_contrib
Модели MongoDB наpytest
и я должен исправитьAgnosticClient
сорт.
import asyncio
from motor.core import AgnosticClient
AgnosticClient.get_io_loop = asyncio.get_running_loop
Я изменил код, и он работает.
def insertManyFn(self,loop,collection_name,documents_to_insert):
try:
inserted_documents_count = loop.run_until_complete(self.insertMany(event_loop,collection_name, documents_to_insert))
if len(inserted_documents_count)==len(documents_to_insert):
document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
except Exception:
raise
loop=asyncio.get_event_loop()
t=Timer(10,self.xyz.insertManyFn,(loop,collection_name,documents_to_insert))
t.start()
Пояснение - я использую таймер Python Threading, который создает собственный поток для выполнения функции через определенное время. Итак, внутри этого потока я получал цикл обработки событий, который не должен быть правильным, сначала он должен получить цикл обработки событий и создать в нем поток таймера. Я думаю, это единственная причина.