Получение RuntimeError: цикл событий закрыт, а будущее принадлежит другому циклу, отличному от указанного в качестве аргумента цикла — asyncio + Python

Я пытаюсь заархивировать минимальные асинхронные параллельные задачи, загружая некоторое содержимое и помещая его в очередь Kafka. Но все работает так, как ожидалось, за исключением того, что выдает ошибку и выходит при первом нажатии.

Ошибка:

          venv\lib\site-packages\aiokafka\producer\message_accumulator.py:305> exception=ValueError('The future belongs to a different loop than the one specified as the loop argument')>" excp="ValueError: The future belongs to a different loop than the one specified as the loop argument"

    level=error ts=time logger=__main__ msg="application exception" excp="RuntimeError: Event loop is closed"
    level=error ts=time logger=asyncio msg="Future exception was never retrieved future: <Future finished exception=RuntimeError('Event loop is closed')>" excp="RuntimeError: Event loop is closed"

Прикрепляю базовую скелетную структуру классов, так как я не мог найти, что происходит за кулисами, увидев эту ошибку, например, цикл событий закрыт . Я думаю, что сообщество может помочь мне понять это и даже исправить.

main .py — успешная инициализация

      from aiokafka import AIOKafkaProducer

async def main(args):
    """Main async entry-point."""
    print("avro serializer registered")
    producer = AIOKafkaProducer(bootstrap_servers=*,value_serializer=*,max_batch_size=*)
    print("kafka producer initialized")
    consumer = TestConsumer(producer=producer, s3=s3)
    try:
        await producer.start()
        await consumer.consume()
    finally:
        await producer.stop()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(prog=__package__, description=__doc__,
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    args = parser.parse_args()
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    try:
        sys.exit(asyncio.run(main(args)))
    except:
        print("exception")

потребитель.py

      class TestConsumer:
    def __init__(self, producer, s3):
        self.producer = producer
        self.s3 = s3

    async def download_and_push_kafka(self, files):
        # Download part got skipped. final variable is response
        print("successfully downloaded content..")
        await self.producer.send(topic=self.topic, value=response) # This successfully push data to queue but getting exception in future completed

    def start_scraping(self, msg):
        """ Starts an async process for requesting download and push to kafka """
        asyncio.run(self.download_and_push_kafka(msg['path']))

    async def consume(self):
        with ThreadPoolExecutor(max_workers=2) as executor:
            while True:
                for msg in queue:
                    future_to_key = {executor.submit(self.start_scraping, key): key for key in
                                     msg['files']}
                    for future in as_completed(future_to_key):
                        key = future_to_key[future]
                        exception = future.exception()
                    await asyncio.sleep(5)

На данный момент наблюдения:

  1. Первые данные успешно отправлены в kafka
  2. Немедленно получая эти ошибки, и приложение перестало работать

Еще раз спасибо за помощь.

0 ответов

Другие вопросы по тегам