Варианты использования для ручного управления смещением в Кафке

Я пытаюсь реализовать потребителя Kafka на Java.

Предположим, что потребитель содержит некоторую логику обработки сообщений, которая может вызвать исключение. В этом случае потребитель должен поспать некоторое время и обработать последнее сообщение.

Моя идея состояла в том, чтобы использовать ручное управление смещением: смещение не фиксируется при сбое, поэтому потребитель предположительно будет читать из старого смещения.

Во время тестирования я обнаружил, что сообщение действительно читается только один раз, несмотря на то, что смещение не зафиксировано. Последнее зафиксированное смещение учитывается только при перезапуске приложения.

Мои вопросы:

  • Правильно ли я поступаю?
  • Каковы варианты использования для ручного управления смещением?

1 ответ

Решение

KafkaConsumer сохранить последние смещения в памяти, поэтому, если возникает исключение (и вы восстанавливаетесь после него), и вы хотите прочитать сообщение во второй раз, вам необходимо использовать seek() перед опросом во второй раз.

Фиксация смещений является "единственной" для сохранения смещений при выключении или сбое клиента (т. Е. Смещения надежно хранятся по сравнению с оперативной памятью). При запуске клиента извлекаются последние зафиксированные смещения, а затем клиент использует только собственные смещения в памяти.

Ручное управление смещением полезно, если вы хотите "связать" коммиты смещения с каким-либо другим действием (например, второй "коммит" в другой системе, который должен быть синхронизирован с коммитами смещения Kafka).

Другие вопросы по тегам