Кафка 0.10.1.0 проблема перебалансировки нескольких групп потребителей

В настоящее время я сталкиваюсь с проблемой восстановления баланса Кафки, которая занимает много времени и застревает, и poll() на потребителя блокируется.

Я использую 3 группы потребителей Kafka для 3 различных типов шаблонов, на которые я подписываюсь.
3 шаблона регулярных выражений: ABC_.*_A, ABC_.*_B, ABC_.*_C, Это означает, что я могу иметь несколько тем в ConsumerGroup1 (ABC_.*_A) такие как ABC_BOOKS_1_A, ABC_MUSIC_1_Aи т. д. Аналогично для двух других групп потребителей.

Я хотел бы призвать poll() по определенной группе потребителей итеративно в течение определенного количества раз; так что я могу потреблять из определенной группы потребителей больше раз, чем другие. Например: ConsumerGroup1: 5 раз, ConsumerGroup2: 2 раза, ConsumerGroup3: 1 раз.

я poll() на потребителя для каждого 100ms и получить максимум 500 записывает каждый раз в цикле для каждой группы потребителей, чтобы обеспечить более быстрое переключение между каждой группой потребителей.

Ниже приведен фрагмент кода:

public class KConsumerRunner implements Runnable {

  ...

  @Override
  public void run() {
    ...
    try {
      KafkaConsumer<String, Object> kafkaConsumer;
      for (String consumerGroupName : this.kafkaConsumerMap.keySet()) {
        kafkaConsumer = this.kafkaConsumerMap.get(consumerGroupName);
        int timeSlices = this.priorityTimeSliceMap.get(consumerGroupName);

        for (int i = 0; i < timeSlices; i++) {
          ConsumerRecords<String, Object> records = kafkaConsumer.poll(100);
          if (records.isEmpty())
            break;
          this.processor.process(records);
        }
      }
    }
    catch (WakeupException we) {
    ...
    }
    ...
  }
}

Приведенный выше код работает с 4 threads, Мои другие параметры конфигурации следующие:

enable.auto.commit = true
auto.offset.reset = earliest
auto.commit.interval.ms = 10000
session.timeout.ms = 30000
metadata.max.age.ms = 10000
max.poll.records = 500
max.partition.fetch.bytes = 31457280

На Кафки server.log Я вижу следующее сообщение, после которого оно застревает:

[2017-05-29 11:58:13,001] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_A with old generation 25 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,001] INFO [GroupCoordinator 0]: Stabilized group ABC_C generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,004] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_C for generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,006] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_C with old generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,007] INFO [GroupCoordinator 0]: Stabilized group ABC_C generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,008] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_C for generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,011] INFO [GroupCoordinator 0]: Stabilized group ABC_B generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,012] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_B for generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,014] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_B with old generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,014] INFO [GroupCoordinator 0]: Stabilized group ABC_B generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,015] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_B for generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,021] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_C with old generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,411] INFO [GroupCoordinator 0]: Stabilized group ABC_A generation 26 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,412] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_A for generation 26 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,419] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_B with old generation 29 (kafka.coordinator.GroupCoordinator)

Кто-нибудь знает, почему Кафка бесконечно перебалансируется и застревает и почему poll() метод на потребителя блокируется и не потребляет никаких сообщений?

0 ответов

Другие вопросы по тегам