Async не работает при использовании pykafka и asyncio
Я пытаюсь вызвать несколько потребительских функций pykafka, используя async. Однако первая функция потребителя pykafka заблокирует работу другой функции.
QueueConsumer lib:
import json
from pykafka import KafkaClient
import configparser
import asyncio
class QueueConsumer(object):
def __init__(self):
config = configparser.ConfigParser()
config.read('config.ini')
self.config = config
async def test(self):
defaultTopic = 'test'
client = KafkaClient(hosts=self.config['kafka']['host'])
topic = client.topics[defaultTopic.encode('utf-8')]
consumer = topic.get_simple_consumer()
# msg = next(consumer)
for message in consumer:
print(defaultTopic+' '+message.value.decode("utf-8"))
async def coba(self):
defaultTopic = 'coba'
client = KafkaClient(hosts=self.config['kafka']['host'])
topic = client.topics[defaultTopic.encode('utf-8')]
consumer = topic.get_simple_consumer()
# msg = next(consumer)
for message in consumer:
print(defaultTopic+' '+message.value.decode("utf-8"))
Затем я вызываю эти функции, используя:
import asyncio
queueConsumer = QueueConsumer()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
queueConsumer.test(),
queueConsumer.coba(),
))
loop.close()
Результат вернет только сообщение очереди из темы "test".
Редактировать: я пытаюсь добавить еще одну функцию
async def factorial(self, name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
А потом называется как:
queueConsumer.test(),
queueConsumer.coba(),
queueConsumer.factorial('a',3),
queueConsumer.factorial('b',5),
queueConsumer.factorial('c',7),
Некоторая печать из факториальной функции выполняется. Но когда вызывается печать из test или coba, тогда другие просто останавливаются.
1 ответ
SimpleConsumer.consume
является блокирующим вызовом, поэтому вам нужно будет настроить свой код так, чтобы он периодически проверял наличие новых сообщений, оставляя контроль между опросами для выполнения других асинхронных операций. Одним из способов достижения этой цели является use_greenlets=True
Kwarg на KafkaClient
полагаясь на gevent для обработки потока управления между несколькими асинхронными операциями.