Когда я приостанавливаю потребителя, другой потребитель из той же группы потребителей должен получить сообщения

Я пытаюсь работать над случаем, когда есть два или более потребителей, слушающих одну общую тему, содержащую только один раздел. Я приостанавливаю одного из потребителей, в это время другие потребители, которые не находятся в режиме паузы, должны иметь возможность получать сообщения из темы. Я наблюдаю, что только когда возобновленный приостановленный потребитель, только тогда один из других активных потребителей может получать сообщения. Как мне этого добиться через весеннюю какфку. Я использую Spring kafka 2.0.0.M2.

Ниже моя строка кода

public class listener{
    @KafkaListener(id = "id2", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
        public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
            count_consumer2 = count_consumer2 + 1;
            if(count_consumer2 == 10) {
                consumer.pause(consumer.assignment());
            }
            acknowledgment.acknowledge();

    }

    @KafkaListener(id = "id1", topics = "abcd", group = "group1", containerFactory = "kafkaListenerContainerFactory")
        public void listenPartition1(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) Long offsets, Acknowledgment acknowledgment, Consumer consumer) throws InterruptedException {
            count_consumer1 = count_consumer1 + 1;
            if(count_consumer1 == 5) {
                consumer.pause(consumer.assignment());
                while(data.database <10){
                      consumer.resume(consumer.assignment());
                 }
            }

             acknowledgment.acknowledge();



        }
}

2 ответа

Извлекайте документы из метода pause():

Приостановить выборку из запрошенных разделов. Будущие вызовы poll(long) не будут возвращать никаких записей из этих разделов, пока они не будут возобновлены с помощью resume(Collection).

Обратите внимание, что этот метод не влияет на подписку раздела. В частности, это не приводит к перебалансированию группы при использовании автоматического назначения.

метод pause не будет инициировать перебалансировку раздела, поэтому другие потребители в этой группе не будут использовать этот раздел, и обратите внимание, что раздел будет использоваться только одним потребителем в той же группе потребителей. Так что в вашем случае работает только потребитель, другие бездействуют.

Вы должны перераспределить вашу тему, чтобы иметь как минимум столько разделов, сколько у вас активных взаимоисключающих потребителей (т.е. тот же group.id). Именно так и работает Кафка.

Другие вопросы по тегам