Когда я приостанавливаю потребителя, другой потребитель из той же группы потребителей должен получить сообщения
Я пытаюсь работать над случаем, когда есть два или более потребителей, слушающих одну общую тему, содержащую только один раздел. Я приостанавливаю одного из потребителей, в это время другие потребители, которые не находятся в режиме паузы, должны иметь возможность получать сообщения из темы. Я наблюдаю, что только когда возобновленный приостановленный потребитель, только тогда один из других активных потребителей может получать сообщения. Как мне этого добиться через весеннюю какфку. Я использую Spring kafka 2.0.0.M2.
Ниже моя строка кода
public class listener{
@KafkaListener(id = "id2", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
count_consumer2 = count_consumer2 + 1;
if(count_consumer2 == 10) {
consumer.pause(consumer.assignment());
}
acknowledgment.acknowledge();
}
@KafkaListener(id = "id1", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
count_consumer1 = count_consumer1 + 1;
if(count_consumer1 == 5) {
consumer.pause(consumer.assignment());
while(data.database <10){
consumer.resume(consumer.assignment());
}
}
acknowledgment.acknowledge();
}
}
2 ответа
Извлекайте документы из метода pause():
Приостановить выборку из запрошенных разделов. Будущие вызовы poll(long) не будут возвращать никаких записей из этих разделов, пока они не будут возобновлены с помощью resume(Collection).
Обратите внимание, что этот метод не влияет на подписку раздела. В частности, это не приводит к перебалансированию группы при использовании автоматического назначения.
метод pause не будет инициировать перебалансировку раздела, поэтому другие потребители в этой группе не будут использовать этот раздел, и обратите внимание, что раздел будет использоваться только одним потребителем в той же группе потребителей. Так что в вашем случае работает только потребитель, другие бездействуют.
Вы должны перераспределить вашу тему, чтобы иметь как минимум столько разделов, сколько у вас активных взаимоисключающих потребителей (т.е. тот же group.id). Именно так и работает Кафка.