Кафка 0.10.1.0 проблема перебалансировки нескольких групп потребителей
В настоящее время я сталкиваюсь с проблемой восстановления баланса Кафки, которая занимает много времени и застревает, и poll()
на потребителя блокируется.
Я использую 3 группы потребителей Kafka для 3 различных типов шаблонов, на которые я подписываюсь.
3 шаблона регулярных выражений: ABC_.*_A
, ABC_.*_B
, ABC_.*_C
, Это означает, что я могу иметь несколько тем в ConsumerGroup1 (ABC_.*_A
) такие как ABC_BOOKS_1_A
, ABC_MUSIC_1_A
и т. д. Аналогично для двух других групп потребителей.
Я хотел бы призвать poll()
по определенной группе потребителей итеративно в течение определенного количества раз; так что я могу потреблять из определенной группы потребителей больше раз, чем другие. Например: ConsumerGroup1: 5 раз, ConsumerGroup2: 2 раза, ConsumerGroup3: 1 раз.
я poll()
на потребителя для каждого 100ms
и получить максимум 500
записывает каждый раз в цикле для каждой группы потребителей, чтобы обеспечить более быстрое переключение между каждой группой потребителей.
Ниже приведен фрагмент кода:
public class KConsumerRunner implements Runnable {
...
@Override
public void run() {
...
try {
KafkaConsumer<String, Object> kafkaConsumer;
for (String consumerGroupName : this.kafkaConsumerMap.keySet()) {
kafkaConsumer = this.kafkaConsumerMap.get(consumerGroupName);
int timeSlices = this.priorityTimeSliceMap.get(consumerGroupName);
for (int i = 0; i < timeSlices; i++) {
ConsumerRecords<String, Object> records = kafkaConsumer.poll(100);
if (records.isEmpty())
break;
this.processor.process(records);
}
}
}
catch (WakeupException we) {
...
}
...
}
}
Приведенный выше код работает с 4 threads
, Мои другие параметры конфигурации следующие:
enable.auto.commit = true
auto.offset.reset = earliest
auto.commit.interval.ms = 10000
session.timeout.ms = 30000
metadata.max.age.ms = 10000
max.poll.records = 500
max.partition.fetch.bytes = 31457280
На Кафки server.log
Я вижу следующее сообщение, после которого оно застревает:
[2017-05-29 11:58:13,001] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_A with old generation 25 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,001] INFO [GroupCoordinator 0]: Stabilized group ABC_C generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,004] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_C for generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,006] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_C with old generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,007] INFO [GroupCoordinator 0]: Stabilized group ABC_C generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:13,008] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_C for generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,011] INFO [GroupCoordinator 0]: Stabilized group ABC_B generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,012] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_B for generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,014] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_B with old generation 28 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,014] INFO [GroupCoordinator 0]: Stabilized group ABC_B generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,015] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_B for generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,021] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_C with old generation 29 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,411] INFO [GroupCoordinator 0]: Stabilized group ABC_A generation 26 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,412] INFO [GroupCoordinator 0]: Assignment received from leader for group ABC_A for generation 26 (kafka.coordinator.GroupCoordinator)
[2017-05-29 11:58:18,419] INFO [GroupCoordinator 0]: Preparing to restabilize group ABC_B with old generation 29 (kafka.coordinator.GroupCoordinator)
Кто-нибудь знает, почему Кафка бесконечно перебалансируется и застревает и почему poll()
метод на потребителя блокируется и не потребляет никаких сообщений?