Карафка start_from_beginning не работает должным образом

Я создал проект, чтобы помочь мне понять Kafka. Он настроен как три идентичных приложения Rails внутри Docker с Karafka, настроенным для приема сообщений - если вы создаете запись в одном, она реплицируется на два других. Я предположил, чтоstart_from_beginningустановка будет означать, что каждый раз, когда сервер Karafka будет перезапущен, он будет начинаться со смещения 0, но это действительно так. Может кто-нибудь объяснить, что я сделал не так, или поправить свое понимание.

Вот два важных раздела из karafka.rb

  setup do |config|
    config.kafka.seed_brokers = %w[kafka://kafka:9092]
    config.client_id = "app_#{ ENV['APP_ID'] }"
    config.logger = Rails.logger
  end

...

  consumer_groups.draw do
    topic :party do
      consumer PartyConsumer
      start_from_beginning true
    end
  end

Я уже пробовал поставить config.kafka.start_from_beginning = true в разделе конфигурации karafka.rb но без радости.

Когда я создаю запись в одном из приложений, она синхронизируется с двумя другими. Вот что я пытался сделать:

  • очистить базу данных в приложении 3
  • перезапустите сервер karafka в приложении 3 (with start_from_beginning = true)

На этом этапе я ожидал, что база данных будет воссоздана из Kafka путем перемотки на смещение 0 и воспроизведения всех сообщений. Что я пропустил?

Полный проект находится здесь: https://github.com/jcleary/kafka-demo

1 ответ

На самом деле проблема заключалась в моем понимании start_from_beginning, цель которого - решить, как вести себя новым потребителям. Ожидается, что существующие потребители всегда начнут с того места, где они остановились.

Я нашел два способа добиться того, что искал:

1 - Использование функции поиска Kafka (рекомендуется):

Из вики: https://github.com/karafka/karafka/wiki/Events-monitoring-and-logging

2 - Изменение client_id в karafka.rb, например

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka.seed_brokers = %w[kafka://kafka:9092]
    config.client_id = "app_#{ ENV['APP_ID'] }-#{ SecureRandom.uuid }"
    config.logger = Rails.logger
  end
  ... 

С помощью SecureRandom.uuid обеспечит вашему потребителю уникальный client_id и, следовательно, повторно обработать все сообщения.

Вариант 2 - это скорее взлом, но в зависимости от вашего варианта использования он может быть тем, что вы ищете.

Другие вопросы по тегам