Кафка ребалансировки и подводные камни для слушателей

Я читаю "Кафку: полное руководство" и хотел бы лучше понять слушателя с перебалансировкой. Пример в книге простой использует HashMap сохранить текущие смещения, которые были обработаны и будут фиксировать текущее состояние при отзыве раздела. Мои опасения:

Есть две проблемы / вопросы, которые у меня есть вокруг примера кода:

  1. Используемый язык заставляет меня предположить, что эти обратные вызовы сделаны в другом потоке. Итак, не следует ли учитывать безопасность потоков при применении текущих смещений? Кроме того, не следует ли отменить текущий пакет после его фиксации?
  2. В нем говорится, что необходимо использовать commitSync, чтобы убедиться, что смещения фиксируются до того, как перебалансировка продолжается. Однако это только синхронно внутри этого потребителя. Есть ли какой-то механизм, по которому координатор не будет действовать, пока не услышит ответ от всех подписавшихся потребителей?

1 ответ

Решение
  1. Я перечитал раздел в книге, и я согласен, что я тоже немного растерялся!

    Javadoc утверждает:

    Этот обратный вызов будет выполняться только в пользовательском потоке как часть вызова poll (long) при каждом изменении назначения раздела.

    Я взглянул на код и подтвердил, что методы слушателя ребалансировки действительно вызываются в том же потоке, который владеет Consumer.

  2. Да, вы должны использовать commitSync() при совершении перебалансировки слушателя.

    Чтобы объяснить почему, давайте посмотрим на пример золотого пути. Мы начинаем с потребителя, который с удовольствием и регулярно бьется с координатором. В какой-то момент координатор возвращает REBALANCE_IN_PROGRESS ошибка запроса сердцебиения. Это может быть вызвано новым участником, желающим присоединиться к группе, участником, который покидает или не может пульсировать, или новым разделом, добавляемым / удаляемым из подписки. На данный момент все потребители должны вернуться в группу.

    Прежде чем пытаться вернуться в группу, потребитель будет синхронно выполнять ConsumerRebalanceListener.onPartitionsRevoked(), Как только слушатель возвращается, потребитель отправит JoinRequest координатору, чтобы присоединиться к группе.

    Тем не менее, и я думаю, что это то, о чем вы думали, если ваш обратный вызов занимает слишком много времени (> session.timeout.ms) для фиксации, группа может быть уже в другом поколении, а разделы со смещением, пытающиеся быть зафиксированными, назначены другому участнику. В этом случае фиксация завершится неудачей, даже если она была синхронной. Но с помощью commitSync() в слушателе вам гарантировано, что потребитель не присоединится к группе до завершения фиксации.

Другие вопросы по тегам