Как преобразовать операцию ThreadPoolExecutor в asyncio — Python

У меня есть код, который работает с использованием ThreadPoolExecutor и работает параллельно с использованием рабочих потоков.

      from aiokafka import AIOKafkaProducer

producer = AIOKafkaProducer(....)

def consume_messages_from_sqs(sqs):
    with ThreadPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
        while True:
            for msg in sqs.receive_messages(VisibilityTimeout=200): #here msg is an array
                submit_tasks(msg, executor, producer)
                time.sleep(..)

def submit_tasks(msg, executor):
    futures = {executor.submit(download_objs, key, kafka_producer): key for key in msg['fileNames']}

def download_objs(fileNames, kafka_producer):
    # Download the file as obj
    for line in gzip.GzipFile(fileobj=f):
        json_string = line.decode('UTF-8')
        json_obj = json.loads(json_string)
        await kafka_producer.send(topic="test", value=json_obj) ##This gives error and how can i efficitently convert this to asyncio

Как я могу преобразовать это с помощью asyncio, чтобы точно выполнить процесс исполнителя пула потоков. я новичок так. Еще раз спасибо.

0 ответов

Другие вопросы по тегам