Потребительский баланс при использовании автокоммитов
Мы используем потребительский клиент kafka 0.10.2.0 со следующей конфигурацией:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
Итак, как вы можете видеть, мы используем автокоммит. Используемая версия потребительского API имеет специальный поток для выполнения автоматической фиксации. Таким образом, каждую секунду у нас есть автокоммит, что означает, что у нас пульс каждую секунду.
Наше время обработки заявки может фактически (время от времени) занимать более 40 секунд (интервал ожидания запроса)
Я хотел спросить:
1 - если время обработки займет, например, минуту. будет ли восстановлен баланс, хотя каждую секунду происходит автоматическая фиксация сердцевины бобов?
2. Что еще более странно, так это то, что в случае длительного времени выполнения мы получаем одно и то же сообщение более одного раза. Это нормально? Если потребитель совершил смещение, почему для сальдо снова используется то же смещение?
Спасибо орёл
3 ответа
Начиная с Kafka v0.10.1.0, вам не нужно вручную запускать автоматическую фиксацию, чтобы делать биение сердца. Сам потребитель Kafka запускает новую нить для механизма биения сердца в фоновом режиме. Чтобы узнать больше, прочитайте KIP-62.
В вашем случае вы можете установить max.poll.interval.ms
до максимального времени, затраченного вашим процессором для обработки max.poll.record
записей.
Просто чтобы уточнить, проверка AutoCommit вызывается в каждом опросе и проверяет, что прошедшее время больше настроенного времени, если да, то только он выполняет фиксацию.
Например. если интервал коммита 5 секунд, а опрос происходит через 7 секунд, то в этом случае коммит произойдет через 7 секунд
На ваши вопросы
Автоматическая фиксация не учитывается для пульса, если длительное время обработки очевидно, что фиксация не произойдет и приведет к тайм-ауту сеанса, что, в свою очередь, вызывает перебалансировку
Этого не должно произойти, если вы не ищете / сбрасываете смещение к ранее зафиксированному смещению или произошел ребалансировка