aiokafka завершает работу при работе в многопроцессорном классе
Сегодня я бился головой о стену, пытаясь понять, почему это не работает. Я создал этот класс многопроцессорности:
class Consumer(multiprocessing.Process):
def __init__(self, topic, **kwargs):
self.topic = topic
super(Consumer, self).__init__(**kwargs)
def _deserializer(serialized):
return json.loads(serialized)
async def _consume(self):
consumer = AIOKafkaConsumer(
self.topic,
# group_id=None,
group_id="Deployment",
value_deserializer=self._deserializer,
bootstrap_servers='localhost:30322',
)
await consumer.start()
tasks = []
try:
async for msg in consumer:
logging.info("***** reading message *****")
tasks.append(asyncio.create_task(process_msg(msg, 1)))
finally:
await consumer.stop()
await asyncio.gather(*tasks)
def run(self):
asyncio.run(self._consume())
И мой основной файл делает это:
num_procs = 1
processes = [Consumer("deployment_requests") for _ in range(num_procs)]
for p in processes:
p.start()
for p in processes:
logging.info(f'pid is {p.pid}')
for p in processes:
p.join()
logging.info(f'pid is {p.pid}')
И вывод
2023-05-02 16:03:58 - INFO - pid is 23520
2023-05-02 16:04:03 - INFO - Updating subscribed topics to: frozenset({'deployment_requests'})
2023-05-02 16:04:03 - INFO - Discovered coordinator 0 for group Deployment
2023-05-02 16:04:03 - INFO - Revoking previously assigned partitions set() for group Deployment
2023-05-02 16:04:03 - INFO - (Re-)joining group Deployment
2023-05-02 16:04:03 - INFO - Joined group 'Deployment' (generation 182) with member_id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838
2023-05-02 16:04:03 - INFO - Elected group leader -- performing partition assignments using roundrobin
2023-05-02 16:04:03 - INFO - Successfully synced group Deployment with generation 182
2023-05-02 16:04:03 - INFO - Setting newly assigned partitions {TopicPartition(topic='deployment_requests', partition=0)} for group Deployment
2023-05-02 16:04:03 - INFO - LeaveGroup request succeeded
Process Consumer-1:
2023-05-02 16:04:04 - INFO - pid is 23520
Если я вынесу код из класса, этот код будет работать так, как ожидалось. Но в нынешнем виде он даже не печатает «****** чтение сообщения *****», поэтому он даже не ожидает сообщений. Поэтому я думаю, что это как-то связано с тем, что p.start() неправильно использует метод run() для вызова asyncio. А может быть и совсем другое :)
Вот журналы производителя, но на стороне производителя проблем нет.
[2023-05-02 21:04:03,943] INFO [GroupCoordinator 0]: Stabilized group Deployment generation 182 (__consumer_offsets-15) (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,947] INFO [GroupCoordinator 0]: Assignment received from leader for group Deployment for generation 182 (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838] in group Deployment has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Preparing to rebalance group Deployment in state PreparingRebalance with old generation 182 (__consumer_offsets-15) (reason: removing member aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
1 ответ
Наконец понял это. Когда я превратил потребителя в класс, десериализатор сломал его. Как только я удалил десериализатор, он начал работать как положено.