Общая подписка HiveMQ с постоянным сеансом
Попытка объединить две функции HiveMQ: общие подписки и постоянные сеансы.
Если создали очень простое сообщение производителя. И очень простой потребитель. При запуске нескольких потребителей все потребители получают все сообщения.
После установки clearSession в значение false для потребителей, при запуске потребителя и перезапуске потребителя, потребитель также получает сообщения, когда он не был подключен. Отлично.
Теперь в сочетании с функцией общей подписки. Когда используется только общая подписка, clearSession имеет значение "true". При запуске нескольких потребителей сообщение принимается только одним потребителем. Это должен быть циклический перебор, и это также имеет место, но как только вы останавливаете потребителя, сообщения перестают быть циклическим перебором, а один из потребителей получает значительно больше сообщений, чем другие.
Если я теперь снова включу постоянный сеанс, clearSession будет иметь значение "false" и запустит потребителей общей подписки, потребители начнут получать все сообщения снова, вместо того, чтобы сообщение просто доставлялось одному клиенту.
В чем здесь проблема? Это ошибка в HiveMQ? Может ли постоянный сеанс и общая подписка не использоваться вместе? Это было бы действительно облом.
ОБНОВЛЕНИЕ 15/2/2017 Как предложил @fraschbi, я очистил все данные и снова проверил общую подписку с постоянными пользователями сеансов. Кажется, работает!
Что странно, так это то, что пропущенные сообщения принимаются только после повторного подключения 1-го потребителя. У всех потребителей одинаковый код, они просто запускаются с разными аргументами clientId. Смотрите код ниже. Моя тестовая последовательность:
- Запустил потребитель1: все сообщения отправляются этому потребителю.
- Запущенный потребитель2: каждый потребитель получает все остальные сообщения.
- Запущенный потребитель3: каждый потребитель получает 1 из 3 сообщений.
- Остановка потребителя1: теперь потребители2 и 3 получают все остальные сообщения. (не знаю, почему я видел этот неравномерный дистрибутив вчера, но, может быть, как упомянул @fraschbi, потому что я повторно использовал clientId и не отписывался и не отключался должным образом)
- остановка потребителя2: все сообщения, полученные потребителем3.
- Остановка потребителя3: сообщения больше не принимаются.
- перезапуск потребителя3: он продолжается с первого сообщения, отправленного производителем. Он не получает потерянные сообщения.
- перезапустите потребителя2: сообщения снова распределяются равномерно.
- перезапустите потребителя1: ЭТОТ теперь получает все потерянные сообщения и продолжает получать каждые 1 из 3 сообщений.
Итак, мой новый вопрос: почему только 1-й потребитель получает потерянные сообщения?
Примечание: хитрость здесь заключается в том, чтобы не отписываться при остановке клиента, потому что тогда настройка подписки / постоянства теряется!
Producer.scala
object Producer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)
var count = 0
sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}
while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")
Thread.sleep(1000)
count = count + 1
}
}
Consumer.scala
object Consumer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = args(1)
// val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) = ()
override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")
override def connectionLost(cause: Throwable) = println("Connection lost")
})
println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);
client.connect(options)
client.subscribe(topic)
sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}
while(true) {
}
}
1 ответ
Я постараюсь ответить на два вопроса, которые вы испытываете отдельно.
Это должен быть циклический перебор, и это также имеет место, но как только вы останавливаете потребителя, сообщения перестают быть циклическим перебором, а один из потребителей получает значительно больше сообщений, чем другие.
HiveMQ предпочитает онлайн-клиентов при распространении сообщений для общих подписок.
Если я теперь снова включу постоянный сеанс, clearSession будет иметь значение "false" и запустит потребителей общей подписки, потребители начнут получать все сообщения снова, вместо того, чтобы сообщение просто доставлялось одному клиенту.
В начале вашего вопроса вы сказали, что соединяете клиентов с cleanSession=false
брокеру и подписке на тему. (Звучит так, как будто вы используете только одну тему.) Возможно ли, что вы не отменили подписку этих клиентов перед повторным подключением к cleanSession=false
а общие подписки? В этом случае подписки на первом этапе вашего сценария будут по-прежнему сохраняться для этих клиентов, и, естественно, каждый из них будет получать сообщения.
РЕДАКТИРОВАТЬ:
Итак, мой новый вопрос: почему только 1-й потребитель получает потерянные сообщения?
Из руководства пользователя HiveMQ:
Когда автономная очередь клиентов заполнена, сообщение для этого клиента не будет отброшено, но будет поставлено в очередь для следующего автономного клиента в группе общих подписок.
Когда все клиенты находятся в автономном режиме, рассылка больше не является циклическим. Таким образом, сценарий, который вы описываете, находится в пределах ожидаемого поведения.
Значение по умолчанию для очереди сообщений - 1000. Таким образом, вы можете отправить более 1000 сообщений, пока клиенты не подключены, или уменьшить размер очереди сообщений.
...
<persistence>
<queued-messages>
<max-queued-messages>50</max-queued-messages>
</queued-messages>
...
</persistence>
...
Добавьте это к вашему config.xml
для уменьшения размера очереди сообщений.