Потребитель на основе confluent-kafka в Python не работает

Очень плохо знакомы с кафкой и авро. Я застрял с проблемой и не могу понять, что здесь происходит не так. Я написал производителя и потребителя kafka, который использует Avro в качестве формата сериализации. Код производителя работает правильно. Как после запуска этого кода, когда я запускаю kafka-avro-console-consumer это дает мне следующее -

bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test --property schema.registry.url=http://127.0.0.1:8081 --from-beginning
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}

Однако, когда я пытаюсь сделать то же самое с помощью Python (следуя этому самому простому примеру), я пишу следующий код -

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


class AvroConsumerAdapter(object):

    def __init__(self, topic='test'):
        self.topic = topic
        self.consumer = AvroConsumer({'bootstrap.servers': 'localhost:9092',
                                      'schema.registry.url': 'http://127.0.0.1:8081',
                                      'group.id': 'mygroup'})
        self.consumer.subscribe([topic])

    def start_consuming(self):
        running = True
        while running:
            try:
                msg = self.consumer.poll(10)
                if msg:
                    print(msg.value())
                    if not msg.error():
                        print("Here - 1")
                        print(msg.value())
                    elif msg.error().code() != KafkaError._PARTITION_EOF:
                        print("here-2")
                        print(msg.error())
                        running = False
                    else:
                        print('Here-3')
                        print(msg.error())
            except SerializerError as e:
                print("Message deserialization failed for %s: %s" % (msg, e))
                running = False
            except Exception as ex:
                print(ex)
                running = False

        self.consumer.close()

Этот клиент остается там навсегда и никогда ничего не печатает. Я не уверен, что здесь не так. Может кто-нибудь, пожалуйста, помогите мне в этом.

1 ответ

Решение

Проверьте параметры конфигурации темы - вам нужно установить auto.offset.reset': 'smallest' если вы хотите обработать все данные в данный момент в теме. По умолчанию это largest Это означает, что он покажет только новые строки произведенных данных. Вы можете убедиться в этом, оставив текущий код Python запущенным и создав новые сообщения для этой темы - вы должны увидеть, как код Python забирает их.

Другие вопросы по тегам