Карафка start_from_beginning не работает должным образом
Я создал проект, чтобы помочь мне понять Kafka. Он настроен как три идентичных приложения Rails внутри Docker с Karafka, настроенным для приема сообщений - если вы создаете запись в одном, она реплицируется на два других. Я предположил, чтоstart_from_beginning
установка будет означать, что каждый раз, когда сервер Karafka будет перезапущен, он будет начинаться со смещения 0, но это действительно так. Может кто-нибудь объяснить, что я сделал не так, или поправить свое понимание.
Вот два важных раздела из karafka.rb
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }"
config.logger = Rails.logger
end
...
consumer_groups.draw do
topic :party do
consumer PartyConsumer
start_from_beginning true
end
end
Я уже пробовал поставить config.kafka.start_from_beginning = true
в разделе конфигурации karafka.rb
но без радости.
Когда я создаю запись в одном из приложений, она синхронизируется с двумя другими. Вот что я пытался сделать:
- очистить базу данных в приложении 3
- перезапустите сервер karafka в приложении 3 (
with start_from_beginning = true
)
На этом этапе я ожидал, что база данных будет воссоздана из Kafka путем перемотки на смещение 0 и воспроизведения всех сообщений. Что я пропустил?
Полный проект находится здесь: https://github.com/jcleary/kafka-demo
1 ответ
На самом деле проблема заключалась в моем понимании start_from_beginning
, цель которого - решить, как вести себя новым потребителям. Ожидается, что существующие потребители всегда начнут с того места, где они остановились.
Я нашел два способа добиться того, что искал:
1 - Использование функции поиска Kafka (рекомендуется):
Из вики: https://github.com/karafka/karafka/wiki/Events-monitoring-and-logging
2 - Изменение client_id
в karafka.rb
, например
class KarafkaApp < Karafka::App
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }-#{ SecureRandom.uuid }"
config.logger = Rails.logger
end
...
С помощью SecureRandom.uuid
обеспечит вашему потребителю уникальный client_id
и, следовательно, повторно обработать все сообщения.
Вариант 2 - это скорее взлом, но в зависимости от вашего варианта использования он может быть тем, что вы ищете.