Кафка правильный способ опроса нет записей

Для поддержания моего потребителя в живых (очень длинная обработка переменной длины) я реализую пустой вызов poll() в фоновом потоке, который будет удерживать брокера от перебалансировки, если я потрачу слишком много времени между опросами (). Я установил очень большой интервал опроса, но я не хочу просто увеличивать его навсегда для более длительной и длительной обработки.

Как правильно опросить без записей? В настоящее время я вызываю poll(), а затем повторно обращаюсь к самым ранним смещениям для каждого раздела, возвращенного в вызове poll(), чтобы они могли правильно считываться основным потоком после завершения обработки предыдущих сообщений.

ConsumerRecords<String, String> msgs = kafkaConsumer.poll(timeout);
Map<Integer, Long> partitionToOffsets = getEarliestPartitionOffsets(msgs); // helper method
seekToOffsets(partitionToOffsets);

1 ответ

Решение

Правильный способ справиться с длительным временем обработки (и избежать перебалансировки потребителей) - это использовать KafkaConsumer.pause() / KafkaConsumer.resume() методы. Вы можете прочитать больше об этом здесь:

Другие вопросы по тегам