Потребитель Kafka терпит неудачу при смещении и перебалансировке
У меня есть потребитель Kafka, который подписан только на одну тему. В определенный момент после правильной работы я получаю следующие сообщения в своих журналах:
Line 25694: 2017-05-15 17:59:53.656 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
Line 25739: 2017-05-15 18:01:39.745 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
Line 25740: 2017-05-15 18:01:39.745 [WARN ] [MeasureConsumerExecutor] ConsumerCoordinator - Auto offset commit failed: null
Line 25766: 2017-05-15 18:10:52.539 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
Line 25789: 2017-05-15 18:25:51.036 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
Line 25790: 2017-05-15 18:25:52.241 [WARN ] [MeasureConsumerExecutor] ConsumerCoordinator - Auto offset commit failed: null
Line 25796: 2017-05-15 18:31:10.354 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
Line 25797: 2017-05-15 18:31:24.101 [INFO ] [MeasureConsumerExecutor] EventConsumer - run() - WARN - msg: KafkaConsumer will be CLOSED!
Мой код действительно прост:
private final AtomicBoolean closed = new AtomicBoolean(false); ...
...
...
try {
while (!closed.get()) {
ConsumerRecords<String, Message> records = kafkaConsumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, Message> record : records) {
Message message = record.value();
messageArrived(message);
}
}
logger.info("run() - NOTIFY - msg: idConsumer = [{}] HAS !closed.get() = [{}]", consumerId, !closed.get());
} catch (WakeupException wakeupException) {
logger.error("run() - ERROR - msg: Error on Consumer [{}] caused by = [{}]", getConsumerId(), wakeupException.getMessage(), wakeupException);
// Ignore exception if closing
if (!closed.get())
throw wakeupException;
} catch (KafkaException kafkaException) {
logger.error("run() - ERROR - msg: Error on Consumer [" + getConsumerId() + "] caused by = [" + kafkaException.getMessage() + "]", kafkaException);
} catch (Exception exception) {
logger.error("run() - ERROR - msg: Error on Consumer [" + getConsumerId() + "] caused by = [" + exception.getMessage() + "]", exception);
} finally {
logger.info("run() - WARN - msg: KafkaConsumer will be CLOSED!");
if (null != kafkaConsumer) {
kafkaConsumer.close();
}
}
}
Странно то, что я получаю последний журнал WARN ("KafkaConsumer будет ЗАКРЫТ") без входа в журналы исключений (так что, очевидно, без исключений) и без изменения "закрытой" переменной нигде.
У меня есть несколько потребителей, которые работают так же, как этот, параллельно на разные темы, но я думаю, что это не актуально. Посредник находится на другой физической машине в той же подсети.
Не могли бы вы дать мне несколько советов о том, что здесь происходит, и как я могу решить эту проблему, чтобы предотвратить отключение потребителей или, по крайней мере, возможность восстановления после него?
Большое спасибо заранее.