Кафка ребалансировки и подводные камни для слушателей
Я читаю "Кафку: полное руководство" и хотел бы лучше понять слушателя с перебалансировкой. Пример в книге простой использует HashMap
сохранить текущие смещения, которые были обработаны и будут фиксировать текущее состояние при отзыве раздела. Мои опасения:
Есть две проблемы / вопросы, которые у меня есть вокруг примера кода:
- Используемый язык заставляет меня предположить, что эти обратные вызовы сделаны в другом потоке. Итак, не следует ли учитывать безопасность потоков при применении текущих смещений? Кроме того, не следует ли отменить текущий пакет после его фиксации?
- В нем говорится, что необходимо использовать commitSync, чтобы убедиться, что смещения фиксируются до того, как перебалансировка продолжается. Однако это только синхронно внутри этого потребителя. Есть ли какой-то механизм, по которому координатор не будет действовать, пока не услышит ответ от всех подписавшихся потребителей?
1 ответ
Я перечитал раздел в книге, и я согласен, что я тоже немного растерялся!
Javadoc утверждает:
Этот обратный вызов будет выполняться только в пользовательском потоке как часть вызова poll (long) при каждом изменении назначения раздела.
Я взглянул на код и подтвердил, что методы слушателя ребалансировки действительно вызываются в том же потоке, который владеет Consumer.
Да, вы должны использовать
commitSync()
при совершении перебалансировки слушателя.Чтобы объяснить почему, давайте посмотрим на пример золотого пути. Мы начинаем с потребителя, который с удовольствием и регулярно бьется с координатором. В какой-то момент координатор возвращает
REBALANCE_IN_PROGRESS
ошибка запроса сердцебиения. Это может быть вызвано новым участником, желающим присоединиться к группе, участником, который покидает или не может пульсировать, или новым разделом, добавляемым / удаляемым из подписки. На данный момент все потребители должны вернуться в группу.Прежде чем пытаться вернуться в группу, потребитель будет синхронно выполнять
ConsumerRebalanceListener.onPartitionsRevoked()
, Как только слушатель возвращается, потребитель отправит JoinRequest координатору, чтобы присоединиться к группе.Тем не менее, и я думаю, что это то, о чем вы думали, если ваш обратный вызов занимает слишком много времени (>
session.timeout.ms
) для фиксации, группа может быть уже в другом поколении, а разделы со смещением, пытающиеся быть зафиксированными, назначены другому участнику. В этом случае фиксация завершится неудачей, даже если она была синхронной. Но с помощьюcommitSync()
в слушателе вам гарантировано, что потребитель не присоединится к группе до завершения фиксации.