Разъем Kafka - не может остановить перебалансировку

Я использую Kafka Connector Confluent 3.0.1 версии. Я создаю новую группу с именем new-group и в ней около 20 тем. Большинство из этих тем заняты. Но очень жаль, что когда я запускаю фреймворк коннектора, система не может остановить балансировку, около 2 минут - балансировка по всем темам. Я не знаю причину. Часть сообщения об ошибке:

[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
:

Я не знаю, имеет ли это какое-либо отношение к постоянному перебалансированию.

Я знаю, что если KafkaConsumer.poll() длиннее настроенного тайм-аута, kafka отменит раздел, и, таким образом, будет проведен повторный баланс, но я совершенно уверен, что опрос каждого времени не такой длительный. Кто-нибудь может дать мне несколько подсказок?

2 ответа

Я думаю max.poll.records может решить эту проблему. Это настроить количество записей, которые должны быть обработаны на каждой итерации цикла. В 0.10 есть max.poll.records, который устанавливает верхнюю границу количества записей, возвращаемых при каждом вызове.

Также в соответствии с Confluent для customer.poll() должно быть достаточно большое время ожидания сеанса, например, от 30 до 60 секунд.

Вы также можете настроить

session.timeout.ms
heartbeat.interval.ms 
max.partition.fetch.bytes

Рассмотрите возможность обновления до 0.10.1 или выше, поскольку потребитель в этих выпусках был расширен, чтобы лучше обрабатывать более длинные периоды между вызовами poll().

Вы можете увеличить новый max.poll.interval.ms параметр, если вы занимаете более 5 минут, чтобы поместить результаты в HDFS. Это предотвратит исключение вашего потребителя из группы потребителей за отсутствие прогресса.

В примечаниях к выпуску 0.10.1 написано

Новый Java Consumer теперь поддерживает сердцебиение из фонового потока. Существует новая конфигурация max.poll.interval.ms, которая контролирует максимальное время между вызовами опроса до того, как потребитель проактивно покинет группу (5 минут по умолчанию). Значение конфигурации request.timeout.ms всегда должно быть больше, чем max.poll.interval.ms, потому что это максимальное время, которое запрос JoinGroup может блокировать на сервере, пока потребитель перебалансируется, поэтому мы изменили его значение по умолчанию чуть выше 5 минут. Наконец, значение по умолчанию для session.timeout.ms было уменьшено до 10 секунд, а значение по умолчанию для max.poll.records было изменено на 500.

Другие вопросы по тегам