Лучший способ реализовать производитель кафки в ruby ​​без потери данных

Каков наилучший способ реализации производителя Kafka в RoR с использованием гема ruby-kafka (0.7.4) без потери данных при сбое экземпляра?

Сообщение получается из нескольких потоков. Таким образом, простой вызов метода delivery_message не сработает. Итак, используя async_producer с delivery_interval 30 с и delivery_threshold 1000 сообщений.

Проблема с использованием async_producer заключается в том, что, если экземпляр не работает, все мои сообщения будут потеряны. В любом случае, я могу сохранить эти сообщения? Или любой другой лучший способ сделать то же самое?

def asyncThreadProduce
  threads=[]
  kaf = Kafka.new("localhost:9093")
  producer = kaf.async_producer(
    delivery_threshold: 1000,
    delivery_interval: 30,
  )
  threads << Thread.new{
    1000.times do |n|
      producer.produce(n.to_s + 'a', topic: 'test_topic')
    end
  }
  threads << Thread.new{
    1000.times do |n|
      producer.produce(n.to_s + 'b', topic: 'test_topic')
    end
  }
  threads.each { |thr| thr.join }
end

Приведенный выше код является симуляцией того, что происходит. Это работает ожидаемым образом. Единственный вопрос, как предотвратить сообщение в фоновом потоке в случае сбоя экземпляра?

Заранее спасибо!

0 ответов

Другие вопросы по тегам