Потребительский баланс при использовании автокоммитов

Мы используем потребительский клиент kafka 0.10.2.0 со следующей конфигурацией:

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

Итак, как вы можете видеть, мы используем автокоммит. Используемая версия потребительского API имеет специальный поток для выполнения автоматической фиксации. Таким образом, каждую секунду у нас есть автокоммит, что означает, что у нас пульс каждую секунду.

Наше время обработки заявки может фактически (время от времени) занимать более 40 секунд (интервал ожидания запроса)

Я хотел спросить:

1 - если время обработки займет, например, минуту. будет ли восстановлен баланс, хотя каждую секунду происходит автоматическая фиксация сердцевины бобов?

2. Что еще более странно, так это то, что в случае длительного времени выполнения мы получаем одно и то же сообщение более одного раза. Это нормально? Если потребитель совершил смещение, почему для сальдо снова используется то же смещение?

Спасибо орёл

3 ответа

Решение

Ты можешь использовать KafkaConsumer.pause() / KafkaConsumer.resume() для предотвращения перебалансировки потребителя во время длительных пауз обработки. JavaDocs. Посмотрите на этот вопрос.

СР.2. Вы уверены, что эти смещения зафиксированы?

Начиная с Kafka v0.10.1.0, вам не нужно вручную запускать автоматическую фиксацию, чтобы делать биение сердца. Сам потребитель Kafka запускает новую нить для механизма биения сердца в фоновом режиме. Чтобы узнать больше, прочитайте KIP-62.

В вашем случае вы можете установить max.poll.interval.ms до максимального времени, затраченного вашим процессором для обработки max.poll.record записей.

Просто чтобы уточнить, проверка AutoCommit вызывается в каждом опросе и проверяет, что прошедшее время больше настроенного времени, если да, то только он выполняет фиксацию.

Например. если интервал коммита 5 секунд, а опрос происходит через 7 секунд, то в этом случае коммит произойдет через 7 секунд

На ваши вопросы

  1. Автоматическая фиксация не учитывается для пульса, если длительное время обработки очевидно, что фиксация не произойдет и приведет к тайм-ауту сеанса, что, в свою очередь, вызывает перебалансировку

  2. Этого не должно произойти, если вы не ищете / сбрасываете смещение к ранее зафиксированному смещению или произошел ребалансировка

Другие вопросы по тегам