Как отлаживать AvroConsumer в конфлюентной кафке?
Я пытаюсь прочитать Kafka из Python, но получаю сообщение "Нет", "Нет ошибок в интерфейсе командной строки". Я использую переадресацию портов на целевой хост через замазку, а затем тестирую порты через telnet - все работает нормально. Более того, я использую kafkacat в Debian (WSL), и он работает нормально!
kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081
Я использую PyCharm, мой код ниже по тексту. Как мне отлаживать?
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError
topics = ['topic1', 'topic2']
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'smallest',
'schema.registry.url': 'http://localhost:28081',
'api.version.request': True
})
c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])
while True:
try:
msg = c.poll(1)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
print('Message None')
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
как
1 ответ
Первое, что я сделаю, это проверю, приходят ли сообщения по вашим темам с помощью kafka-avro-console-consumer
инструмент.
Затем в своем приложении вы можете попробовать увеличить уровень журнала:
c = AvroConsumer({
# ... your config here
'log_level': 7,
'debug': 'all',
})
Вы можете увидеть различные параметры здесь: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Но я считаю, что ваша проблема связана с тем, как вы назначаете разделы. Если вы используетеsubscribe
чем разделы автоматически назначаются вашему потребителю кластером. Вы можете добавить обратный вызов при подписке, вы можете видеть, какие разделы назначены вашему потребителю, но вам не нужно делать это самостоятельно. См. https://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html