RabbitMQ прекращает публиковать сообщения

Я пишу скрипт для отправки настраиваемых push-уведомлений нашим пользователям нашего приложения. Я использую драгоценный камень amqp. Я установил тестовый обмен (прямой) и привязал к нему одну очередь. Когда я запускаю скрипт, я вижу, что первые несколько сообщений опубликованы, но ни одного после этого. Я не уверен, что я сделал неправильно, но я предполагаю, что это как-то связано с тем, как я использую API EventMachine.

Вот модуль / функция Rabbit, который публикует сообщение в формате JSON. Это было в значительной степени скопировано / вставлено из различных учебных пособий / сайтов: модуль Rabbit

# functions needed to communicate with RabbitMQ server
    def publish_message_to_direct(server_params, exchange, messages)
        begin
            AMQP.start(server_params) do |connection|
                channel  = AMQP::Channel.new(connection)
                exchange = channel.direct(exchange,:durable=>true) 
                messages.each do
                    |msg|
                    exchange.publish(msg) do
                        puts "just published"   
                        connection.close do
                            puts "connection was closed!"
                            #EM.stop
                        end
                    end
                end
            end
        rescue Exception => e
            puts "#{e.message} #{e.backtrace.join("\n")}"
        end
        nil
    end
end

Вот код EM, который вызывает функцию публикации:

userids = Array.new
userids.push(1,2,3,12,23,34,56) # this could be any array of integers
EventMachine.run do
  exchangeName = 'test.push'
  EM.add_periodic_timer(1.0*delay/1000) do
    userid = $userids.pop
    data = Push.get_push_tokens_for_userid(userid) # we have to query db for tokens
    user_notifications = nil
    user_notifications = Array.new
    data.each_hash do 
      |row|
      begin
        notification = Rabbit.create_push_notification_json(
          "This is a push notification!",
          row['userid'].to_i, 
          row['token'], 
          Push.get_device_push_type(row['applicationid'].to_i)
          )   
        user_notifications.push(notification)
        pushEvents += 1
      rescue Exception => e 
        puts "#{e.message} #{e.backtrace.join("\n")}"
      end
    end
    if $userids.empty?
      EM.stop_event_loop
    else
      puts "about to publish"
      Rabbit.publish_message_to_direct(rmq_params,exchangeName,user_notifications)
      userids_pushed.push(userid)
    end
  end

  EM.add_timer(time) do
      EM.stop_event_loop
  end

  Signal.trap("INT") do
    EM.stop_event_loop
  end
end

Вывод выглядит так: несколько сообщений публикуются в начале, но затем в какой-то момент они просто останавливаются:

just published
just published
just published
just published
just published
just published
connection was closed!
connection was closed!
connection was closed!
connection was closed!
connection was closed!
connection was closed!
about to publish
just published
connection was closed!
about to publish
just published
just published
just published
connection was closed!
connection was closed!
connection was closed!
about to publish
just published
connection was closed!
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
about to publish
   .
    .
    .
    about to publish

Честно говоря, я понятия не имею, что происходит в этот момент. Кажется, что соединение закрывается несколько раз, и тогда я предполагаю, что не восстановится в какой-то момент. Что вызвало бы такое поведение и как я могу это исправить? Любая помощь с благодарностью.

0 ответов

Другие вопросы по тегам