Ruby-kafka Прочитать все сообщения темы и выйти

Мне нужно прочитать все сообщения из темы Kafka, затем обработать и выйти (не нужно вечно работать как демон). Я написал код, как показано ниже, он служит цели, если сообщения доступны в теме, если тема пуста (или нет нового сообщения для указанного Group_id), она будет ждать, пока не придет следующее сообщение, мне нужно немедленно выйти, если сообщение недоступно для процесс. Пожалуйста, посмотрите мой код и предложите лучший способ добиться этого. Я использую драгоценный камень ruby-kafka 1.3.0 .

      require 'kafka'
khost = 'xxx.xxx.xxx.xxx'
kport = 'xxxx'

kafka = Kafka.new(["#{khost}:#{kport}"] )
consumer = kafka.consumer(group_id: "my-consumer")
consumer.subscribe("my-topic")

consumer.each_batch do |batch|
    $msg = batch
    consumer.stop  # stop after reading first batch
end 

# Process messages here

$msg.messages.each do |message|
  puts message.value
end 

Я также нашел метод , Однако я не нашел возможности поддерживать и отслеживать уже обработанные сообщения без добавления дополнительного кода.

0 ответов

Другие вопросы по тегам