Получение RuntimeError: цикл событий закрыт, а будущее принадлежит другому циклу, отличному от указанного в качестве аргумента цикла — asyncio + Python
Я пытаюсь заархивировать минимальные асинхронные параллельные задачи, загружая некоторое содержимое и помещая его в очередь Kafka. Но все работает так, как ожидалось, за исключением того, что выдает ошибку и выходит при первом нажатии.
Ошибка:
venv\lib\site-packages\aiokafka\producer\message_accumulator.py:305> exception=ValueError('The future belongs to a different loop than the one specified as the loop argument')>" excp="ValueError: The future belongs to a different loop than the one specified as the loop argument"
level=error ts=time logger=__main__ msg="application exception" excp="RuntimeError: Event loop is closed"
level=error ts=time logger=asyncio msg="Future exception was never retrieved future: <Future finished exception=RuntimeError('Event loop is closed')>" excp="RuntimeError: Event loop is closed"
Прикрепляю базовую скелетную структуру классов, так как я не мог найти, что происходит за кулисами, увидев эту ошибку, например, цикл событий закрыт . Я думаю, что сообщество может помочь мне понять это и даже исправить.
main .py — успешная инициализация
from aiokafka import AIOKafkaProducer
async def main(args):
"""Main async entry-point."""
print("avro serializer registered")
producer = AIOKafkaProducer(bootstrap_servers=*,value_serializer=*,max_batch_size=*)
print("kafka producer initialized")
consumer = TestConsumer(producer=producer, s3=s3)
try:
await producer.start()
await consumer.consume()
finally:
await producer.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog=__package__, description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
args = parser.parse_args()
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
try:
sys.exit(asyncio.run(main(args)))
except:
print("exception")
потребитель.py
class TestConsumer:
def __init__(self, producer, s3):
self.producer = producer
self.s3 = s3
async def download_and_push_kafka(self, files):
# Download part got skipped. final variable is response
print("successfully downloaded content..")
await self.producer.send(topic=self.topic, value=response) # This successfully push data to queue but getting exception in future completed
def start_scraping(self, msg):
""" Starts an async process for requesting download and push to kafka """
asyncio.run(self.download_and_push_kafka(msg['path']))
async def consume(self):
with ThreadPoolExecutor(max_workers=2) as executor:
while True:
for msg in queue:
future_to_key = {executor.submit(self.start_scraping, key): key for key in
msg['files']}
for future in as_completed(future_to_key):
key = future_to_key[future]
exception = future.exception()
await asyncio.sleep(5)
На данный момент наблюдения:
- Первые данные успешно отправлены в kafka
- Немедленно получая эти ошибки, и приложение перестало работать
Еще раз спасибо за помощь.