Лучший способ реализовать производитель кафки в ruby без потери данных
Каков наилучший способ реализации производителя Kafka в RoR с использованием гема ruby-kafka (0.7.4) без потери данных при сбое экземпляра?
Сообщение получается из нескольких потоков. Таким образом, простой вызов метода delivery_message не сработает. Итак, используя async_producer с delivery_interval 30 с и delivery_threshold 1000 сообщений.
Проблема с использованием async_producer заключается в том, что, если экземпляр не работает, все мои сообщения будут потеряны. В любом случае, я могу сохранить эти сообщения? Или любой другой лучший способ сделать то же самое?
def asyncThreadProduce
threads=[]
kaf = Kafka.new("localhost:9093")
producer = kaf.async_producer(
delivery_threshold: 1000,
delivery_interval: 30,
)
threads << Thread.new{
1000.times do |n|
producer.produce(n.to_s + 'a', topic: 'test_topic')
end
}
threads << Thread.new{
1000.times do |n|
producer.produce(n.to_s + 'b', topic: 'test_topic')
end
}
threads.each { |thr| thr.join }
end
Приведенный выше код является симуляцией того, что происходит. Это работает ожидаемым образом. Единственный вопрос, как предотвратить сообщение в фоновом потоке в случае сбоя экземпляра?
Заранее спасибо!