Поиск потребителей kafka не работает: AssertionError: Неназначенный раздел

Потребитель кафки con определенный ниже, работает отлично, когда я пытаюсь получать сообщения из моей темы; Тем не менее, это вызывает у меня проблемы, когда я пытаюсь изменить смещение, используя seek метод или любой из его вариантов. т.е. seek_to_beginning, seek_to_end

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(my_topic, bootstrap_servers = my_bootstrapservers, group_id = my_groupid)
p = con.partitions_for_topic(my_topic)
my_partition = p.pop()
tp = TopicPartition(topic = my_topic, partition = my_partition)
print ('*** tp: ', tp)
con.seek_to_beginning(tp)

он генерирует следующий вывод и ошибку ниже:

*** tp:  TopicPartition(topic='mytopic', partition=0)
File "/myhome/anaconda3/lib/python3.6/site-packages/kafka/consumer/group.py", line 735, in seek_to_beginning
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition

Ошибка "Неназначенный раздел" не выглядит для меня действительной, потому что я просто получаю раздел и раздел темы от самого потребителя и передаю его обратно в метод поиска, чтобы он был определенно назначен. есть идеи?

1 ответ

Решение

Мне удалось выяснить способ поиска по разделу моей темы, изменив, как раздел назначается моему потребителю. Кажется, проблема в том, что потребитель не мог работать с разделом темы, на который была подписана по умолчанию, поэтому мне пришлось изменить конструктор, чтобы запретить подписку на тему по умолчанию, а затем явно назначить раздел для моего потребителя.

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)
Другие вопросы по тегам