Async не работает при использовании pykafka и asyncio

Я пытаюсь вызвать несколько потребительских функций pykafka, используя async. Однако первая функция потребителя pykafka заблокирует работу другой функции.

QueueConsumer lib:

import json
from pykafka import KafkaClient
import configparser

import asyncio


class QueueConsumer(object):

    def __init__(self):
        config = configparser.ConfigParser()
        config.read('config.ini')
        self.config = config

    async def test(self):
        defaultTopic = 'test'
        client = KafkaClient(hosts=self.config['kafka']['host'])
        topic = client.topics[defaultTopic.encode('utf-8')]
        consumer = topic.get_simple_consumer()
        # msg = next(consumer)
        for message in consumer:
            print(defaultTopic+' '+message.value.decode("utf-8"))

    async def coba(self):
        defaultTopic = 'coba'
        client = KafkaClient(hosts=self.config['kafka']['host'])
        topic = client.topics[defaultTopic.encode('utf-8')]
        consumer = topic.get_simple_consumer()
        # msg = next(consumer)
        for message in consumer:
            print(defaultTopic+' '+message.value.decode("utf-8"))

Затем я вызываю эти функции, используя:

import asyncio
queueConsumer = QueueConsumer()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    queueConsumer.test(),
    queueConsumer.coba(),
))
loop.close()

Результат вернет только сообщение очереди из темы "test".

Редактировать: я пытаюсь добавить еще одну функцию

async def factorial(self, name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

А потом называется как:

    queueConsumer.test(),
queueConsumer.coba(),
queueConsumer.factorial('a',3),
queueConsumer.factorial('b',5),
queueConsumer.factorial('c',7),

Некоторая печать из факториальной функции выполняется. Но когда вызывается печать из test или coba, тогда другие просто останавливаются.

1 ответ

SimpleConsumer.consume является блокирующим вызовом, поэтому вам нужно будет настроить свой код так, чтобы он периодически проверял наличие новых сообщений, оставляя контроль между опросами для выполнения других асинхронных операций. Одним из способов достижения этой цели является use_greenlets=True Kwarg на KafkaClientполагаясь на gevent для обработки потока управления между несколькими асинхронными операциями.

Другие вопросы по тегам