Общая подписка HiveMQ с постоянным сеансом

Попытка объединить две функции HiveMQ: общие подписки и постоянные сеансы.

Если создали очень простое сообщение производителя. И очень простой потребитель. При запуске нескольких потребителей все потребители получают все сообщения.

После установки clearSession в значение false для потребителей, при запуске потребителя и перезапуске потребителя, потребитель также получает сообщения, когда он не был подключен. Отлично.

Теперь в сочетании с функцией общей подписки. Когда используется только общая подписка, clearSession имеет значение "true". При запуске нескольких потребителей сообщение принимается только одним потребителем. Это должен быть циклический перебор, и это также имеет место, но как только вы останавливаете потребителя, сообщения перестают быть циклическим перебором, а один из потребителей получает значительно больше сообщений, чем другие.

Если я теперь снова включу постоянный сеанс, clearSession будет иметь значение "false" и запустит потребителей общей подписки, потребители начнут получать все сообщения снова, вместо того, чтобы сообщение просто доставлялось одному клиенту.

В чем здесь проблема? Это ошибка в HiveMQ? Может ли постоянный сеанс и общая подписка не использоваться вместе? Это было бы действительно облом.

ОБНОВЛЕНИЕ 15/2/2017 Как предложил @fraschbi, я очистил все данные и снова проверил общую подписку с постоянными пользователями сеансов. Кажется, работает!

Что странно, так это то, что пропущенные сообщения принимаются только после повторного подключения 1-го потребителя. У всех потребителей одинаковый код, они просто запускаются с разными аргументами clientId. Смотрите код ниже. Моя тестовая последовательность:

  • Запустил потребитель1: все сообщения отправляются этому потребителю.
  • Запущенный потребитель2: каждый потребитель получает все остальные сообщения.
  • Запущенный потребитель3: каждый потребитель получает 1 из 3 сообщений.
  • Остановка потребителя1: теперь потребители2 и 3 получают все остальные сообщения. (не знаю, почему я видел этот неравномерный дистрибутив вчера, но, может быть, как упомянул @fraschbi, потому что я повторно использовал clientId и не отписывался и не отключался должным образом)
  • остановка потребителя2: все сообщения, полученные потребителем3.
  • Остановка потребителя3: сообщения больше не принимаются.
  • перезапуск потребителя3: он продолжается с первого сообщения, отправленного производителем. Он не получает потерянные сообщения.
  • перезапустите потребителя2: сообщения снова распределяются равномерно.
  • перезапустите потребителя1: ЭТОТ теперь получает все потерянные сообщения и продолжает получать каждые 1 из 3 сообщений.

Итак, мой новый вопрос: почему только 1-й потребитель получает потерянные сообщения?

Примечание: хитрость здесь заключается в том, чтобы не отписываться при остановке клиента, потому что тогда настройка подписки / постоянства теряется!

Producer.scala

object Producer extends App {

  val topic = args(0)
  val brokerUrl = "tcp://localhost:1883"

  val clientId = UUID.randomUUID().toString

  val client = new MqttClient(brokerUrl, clientId)
  client.connect()
  val theTopic = client.getTopic(topic)

  var count = 0

  sys.addShutdownHook {
    println("Disconnecting client...")
    client.disconnect()
    println("Disconnected.")
  }

  while(true) {
    val msg = new MqttMessage(s"Message: $count".getBytes())
    theTopic.publish(msg)
    println(s"Published: $msg")

    Thread.sleep(1000)

    count = count + 1
  }
}

Consumer.scala

object Consumer extends App {

  val topic = args(0)
  val brokerUrl = "tcp://localhost:1883"

  val clientId = args(1)
//  val clientId = UUID.randomUUID().toString

  val client = new MqttClient(brokerUrl, clientId)
  client.setCallback(new MqttCallback {
    override def deliveryComplete(token: IMqttDeliveryToken) = ()

    override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")

    override def connectionLost(cause: Throwable) = println("Connection lost")
  })

  println(s"Start $clientId consuming from topic: $topic")
  val options = new MqttConnectOptions()
  options.setCleanSession(false);

  client.connect(options)
  client.subscribe(topic)

  sys.addShutdownHook {
    println("Disconnecting client...")
//    client.unsubscribe(topic)
    client.disconnect()
    println("Disconnected.")
  }


  while(true) {

  }

}

1 ответ

Я постараюсь ответить на два вопроса, которые вы испытываете отдельно.

Это должен быть циклический перебор, и это также имеет место, но как только вы останавливаете потребителя, сообщения перестают быть циклическим перебором, а один из потребителей получает значительно больше сообщений, чем другие.

HiveMQ предпочитает онлайн-клиентов при распространении сообщений для общих подписок.

Если я теперь снова включу постоянный сеанс, clearSession будет иметь значение "false" и запустит потребителей общей подписки, потребители начнут получать все сообщения снова, вместо того, чтобы сообщение просто доставлялось одному клиенту.

В начале вашего вопроса вы сказали, что соединяете клиентов с cleanSession=false брокеру и подписке на тему. (Звучит так, как будто вы используете только одну тему.) Возможно ли, что вы не отменили подписку этих клиентов перед повторным подключением к cleanSession=false а общие подписки? В этом случае подписки на первом этапе вашего сценария будут по-прежнему сохраняться для этих клиентов, и, естественно, каждый из них будет получать сообщения.

РЕДАКТИРОВАТЬ:

Итак, мой новый вопрос: почему только 1-й потребитель получает потерянные сообщения?

Из руководства пользователя HiveMQ:

Когда автономная очередь клиентов заполнена, сообщение для этого клиента не будет отброшено, но будет поставлено в очередь для следующего автономного клиента в группе общих подписок.

Когда все клиенты находятся в автономном режиме, рассылка больше не является циклическим. Таким образом, сценарий, который вы описываете, находится в пределах ожидаемого поведения.

Значение по умолчанию для очереди сообщений - 1000. Таким образом, вы можете отправить более 1000 сообщений, пока клиенты не подключены, или уменьшить размер очереди сообщений.

...
<persistence>
     <queued-messages>

         <max-queued-messages>50</max-queued-messages>

     </queued-messages>
    ...
</persistence>
...

Добавьте это к вашему config.xml для уменьшения размера очереди сообщений.

Другие вопросы по тегам