aiokafka завершает работу при работе в многопроцессорном классе

Сегодня я бился головой о стену, пытаясь понять, почему это не работает. Я создал этот класс многопроцессорности:

      class Consumer(multiprocessing.Process):
def __init__(self, topic, **kwargs):
    self.topic = topic
    super(Consumer, self).__init__(**kwargs)

def _deserializer(serialized):
    return json.loads(serialized)

async def _consume(self):
    consumer = AIOKafkaConsumer(
        self.topic,
        # group_id=None,
        group_id="Deployment",
        value_deserializer=self._deserializer,
        bootstrap_servers='localhost:30322',
    )

    await consumer.start()

    tasks = []
    try:
        async for msg in consumer:
            logging.info("***** reading message *****")
            tasks.append(asyncio.create_task(process_msg(msg, 1)))
    finally:
        await consumer.stop()
    await asyncio.gather(*tasks)

def run(self):
    asyncio.run(self._consume())

И мой основной файл делает это:

      num_procs = 1
processes = [Consumer("deployment_requests") for _ in range(num_procs)]

for p in processes:
    p.start()

for p in processes:
    logging.info(f'pid is {p.pid}')

for p in processes:
    p.join()
    logging.info(f'pid is {p.pid}')

И вывод

      2023-05-02 16:03:58 - INFO    - pid is 23520
2023-05-02 16:04:03 - INFO    - Updating subscribed topics to: frozenset({'deployment_requests'})
2023-05-02 16:04:03 - INFO    - Discovered coordinator 0 for group Deployment
2023-05-02 16:04:03 - INFO    - Revoking previously assigned partitions set() for group Deployment
2023-05-02 16:04:03 - INFO    - (Re-)joining group Deployment
2023-05-02 16:04:03 - INFO    - Joined group 'Deployment' (generation 182) with member_id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838
2023-05-02 16:04:03 - INFO    - Elected group leader -- performing partition assignments using roundrobin
2023-05-02 16:04:03 - INFO    - Successfully synced group Deployment with generation 182
2023-05-02 16:04:03 - INFO    - Setting newly assigned partitions {TopicPartition(topic='deployment_requests', partition=0)} for group Deployment
2023-05-02 16:04:03 - INFO    - LeaveGroup request succeeded
Process Consumer-1:
2023-05-02 16:04:04 - INFO    - pid is 23520

Если я вынесу код из класса, этот код будет работать так, как ожидалось. Но в нынешнем виде он даже не печатает «****** чтение сообщения *****», поэтому он даже не ожидает сообщений. Поэтому я думаю, что это как-то связано с тем, что p.start() неправильно использует метод run() для вызова asyncio. А может быть и совсем другое :)

Вот журналы производителя, но на стороне производителя проблем нет.

      [2023-05-02 21:04:03,943] INFO [GroupCoordinator 0]: Stabilized group Deployment generation 182 (__consumer_offsets-15) (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,947] INFO [GroupCoordinator 0]: Assignment received from leader for group Deployment for generation 182 (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838] in group Deployment has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Preparing to rebalance group Deployment in state PreparingRebalance with old generation 182 (__consumer_offsets-15) (reason: removing member aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)

1 ответ

Наконец понял это. Когда я превратил потребителя в класс, десериализатор сломал его. Как только я удалил десериализатор, он начал работать как положено.

Другие вопросы по тегам