Как получить последнее значение смещения от потребителя confluent_python AVRO

Я довольно новичок в confluent_kafka, но я приобрел некоторый опыт работы с kafka-python. То, что я пытаюсь сделать, это изменить смещение, где начать потреблять сообщения. Вот почему я хотел бы создать потребительский клиент, способный вернуться к предыдущим сообщениям, чтобы возвращать данные, которые будут заполнять информационную панель. Сказал, что с помощью пакета kafka-python я могу использовать seek_to_end ( https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py) метод для получения значения позиции последнего коммита. Имея это, я могу вычесть значения и вернуться к предыдущим сообщениям, используя seek метод ( https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py)

С другой стороны, у conflient_kafka, похоже, нет похожих функций, и я обнаружил, что до сих пор я использовал переменные OFFSET_END со значением -1, и он не возвращает мне числовое значение смещения последнего и наибольшего один. Я также могу использовать функцию 'seek', но мне нужен способ получить числовое значение последнего смещения, а не -1,

Мой авро потребитель выглядит так

from confluent_kafka.avro import AvroConsumer

if __name__ == '__main__':
     c = AvroConsumer({"bootstrap.servers": "locahost:29092", "group.id":"mygroup",'schema.registry.url': 'http://localhost:8081',
                  'enable.auto.commit': True,'default.topic.config': {'auto.offset.reset': 'smallest'}})

def my_assign (consumer, partitions):
    for p in partitions:
        p.offset = confluent_kafka.OFFSET_END
        print("offset=",p.offset)
    print('assign', partitions)
    print('position:',consumer.position(partitions))
    consumer.assign(partitions)

c.subscribe(["mytopic"],on_assign=my_assign)

while True:
    m = c.poll(1)
    if m is None:
        continue

    if m.error() is None:
        print('Received message', m.value(),m.offset())
c.close()

который дает следующий результат:

offset= -1
assign [TopicPartition{topic=mytopic,partition=0,offset=-1,error=None}]
position: [TopicPartition{topic=mytopic,partition=0,offset=-1001,error=None}]

и остается в ожидании следующего сообщения. Мне было интересно, если кто-нибудь может мне помочь. Спасибо

0 ответов

Вы можете использовать Consumer.get_watermark_offsets(см. документы)

Пример:

cfg = {
    # ... ...
    "group.id": str(uuid4())
}
consumer = AvroConsumer(cfg)
topic_partition = TopicPartition("topic-name", partition=123)
low, high = consumer.get_watermark_offsets(topic_partition)
print("the latest offset is {}".format(high))

Пожалуйста, вы можете предоставить свое решение. Следующее не работает (продолжайте получать старые сообщения):

from confluent_kafka.avro import AvroConsumer

c = AvroConsumer({"bootstrap.servers": "locahost:29092", "group.id":"mygroup",'schema.registry.url': 'http://localhost:8081',
'enable.auto.commit': True,'default.topic.config': {'auto.offset.reset': 'latest'}})

c.subscribe(["mytopic"])

while True:
    m = c.poll(1)
    if m is None:
        continue

    if m.error() is None:
        print('Received message', m.value(),m.offset())
c.close()

Обязательна ли часть с OFFSET_END? большое спасибо вам

Другие вопросы по тегам