Как отлаживать AvroConsumer в конфлюентной кафке?

Я пытаюсь прочитать Kafka из Python, но получаю сообщение "Нет", "Нет ошибок в интерфейсе командной строки". Я использую переадресацию портов на целевой хост через замазку, а затем тестирую порты через telnet - все работает нормально. Более того, я использую kafkacat в Debian (WSL), и он работает нормально!

kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081

Я использую PyCharm, мой код ниже по тексту. Как мне отлаживать?

from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError

topics = ['topic1', 'topic2']
c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'smallest',
    'schema.registry.url': 'http://localhost:28081',
    'api.version.request': True
})

c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])

while True:
    try:
        msg = c.poll(1)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        print('Message None')
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()

как

1 ответ

Первое, что я сделаю, это проверю, приходят ли сообщения по вашим темам с помощью kafka-avro-console-consumer инструмент.

Затем в своем приложении вы можете попробовать увеличить уровень журнала:

c = AvroConsumer({
    # ... your config here
    'log_level': 7,
    'debug': 'all',
})

Вы можете увидеть различные параметры здесь: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Но я считаю, что ваша проблема связана с тем, как вы назначаете разделы. Если вы используетеsubscribeчем разделы автоматически назначаются вашему потребителю кластером. Вы можете добавить обратный вызов при подписке, вы можете видеть, какие разделы назначены вашему потребителю, но вам не нужно делать это самостоятельно. См. https://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html

Другие вопросы по тегам