MQTT Сохраненные сообщения, не полученные при подписке на одну и ту же тему из разных приложений одновременно
TL;DR
При подписке на одно и то же дерево тем с несколькими клиентами одновременно не все клиенты получают сохраненные сообщения, как ожидалось!
подробно / вариант использования
В реальном проекте несколько приложений подписываются (почти одновременно, потому что они запускаются параллельно) на одну и ту же тему MQTT (с подстановочными знаками). В теме содержится около 500 сохраненных сообщений (каждое на уровне отдельного подтемы), которые, как ожидается, получат все приложения (они подписываются на QoS 1).
Кроме сообщений "конфигурация", темы данных подписаны тем же соединением MQTT. Постоянное состояние не требуется (и требуется здесь). Поэтому экземпляры приложения связаны с cleanSession=true
,
Насколько я понимаю, было бы достаточно, если бы каждый экземпляр приложения соединялся с фиксированным clientId как cleanSession=true, чтобы избежать какой-либо обработки состояния. Но чтобы быть действительно уверенным, что ни одно государство не считается unique MQTT clientId
генерируется для каждого соединения.
поведение наблюдателя
К сожалению, не все экземпляры приложений получают сохраненные сообщения. Некоторые вообще не получают сообщений из темы - независимо от того, как долго длится подписка. Сначала я подумал, что причиной может быть конфигурация maxInflight (на стороне клиента) или max_queued_messages (на стороне сервера), но после увеличения обоих до 500000 я полагаю, что это не является причиной сбоя.
воспроизведение как тест
Поэтому я создал этот проект GitHub с репро. В репро есть класс юнит-теста MqttSubscriptionTest
с методом испытаний multiThreadSubscriptionTest
, При выполнении этого теста некоторые (1000) сохраненных сообщений будут сначала опубликованы в @BeforeClass
метод. После этого 10 экземпляров MqttSubscriber
класс, который реализует IMqttMessageListener
а также Runnable
Интерфейс будет создан и выполнен. Каждый экземпляр MqttSubscriber будет выполняться в собственном потоке с собственным экземпляром MqttClient и будет подписываться на дерево тем с сохраненными сообщениями. Это регистрируется в консоли следующим образом:
----------- perform subscriptions
Subscriber-3 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-0 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-2 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-4 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-5 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-6 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-1 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-7 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-8 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-9 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Тест подождет некоторое время и после этого подтвердит подписку. Ожидается, что каждый подписчик получил 1000 сохраненных сообщений:
----------- validate subscriptions
Subscriber-4: receivedMessages=1000; duration=372ms; succeeded=true
Subscriber-0: receivedMessages=1000; duration=265ms; succeeded=true
Subscriber-5: receivedMessages=1000; duration=475ms; succeeded=true
Subscriber-7: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-6: receivedMessages=1000; duration=473ms; succeeded=true
Subscriber-8: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-9: receivedMessages=1000; duration=346ms; succeeded=true
Subscriber-3: receivedMessages=1000; duration=243ms; succeeded=true
Subscriber-1: receivedMessages=1000; duration=470ms; succeeded=true
Subscriber-2: receivedMessages=1000; duration=357ms; succeeded=true
Большинство абонентов получили ожидаемые 1000 сообщений за очень короткое время (несколько сотен мс). Но некоторые (здесь Subscriber-7/8) не получили ни одного сообщения (длительность равна 0, потому что они никогда не заканчивались). Ситуация не лучше, когда подписчики получают больше времени для получения сообщений. Они просто не получат их.
Я понятия не имею, почему это происходит. Сообщения об ошибках не отображаются на MQTT-посреднике или на стороне клиента. Если вы можете оказать какую-либо помощь, это было бы очень полезно для меня, потому что я зависел от надежной доставки сохраненных сообщений.
Воспроизведение на GitHub: https://github.com/FrVaBe/MQTT/tree/master/mqtt-client-showcase
- Я тестировал с местным брокером EMQ и HiveMQ. Если вы хотите запустить тест, вам нужно запустить брокер на вашем компьютере по адресу localhost:1883 или изменить конфигурацию в классе теста.
- Я использую Eclipse Paho Java Client MQTT
- Я подписываюсь с
cleanSession=true
потому что я не хочу иметь никакого состояния (соединение используется для подписки на несколько тем, а доставка пропущенных сообщений не требуется)
2 ответа
Люди из HiveMQ были достаточно любезны, чтобы взглянуть на эту проблему. Они подозревают причину в клиенте Paho и использование IMqttMessageListener
в подписке. Существует описанная проблема № 432 предполагаемого состояния гонки.
Извлеченный урок: лучшее использование MqttCallback
вместо IMqttMessageListener
Вы не можете использовать cleanSession(true)
во время подключения смотрите объяснение здесь: http://www.steves-internet-guide.com/mqtt-retained-messages-example/
Я использовал mosquito для тестов, и его внутренняя очередь сообщений составляет всего 100 по умолчанию.
HiveMQ имеет максимум сообщений в очереди.
Конфигурация Emq имеет нечто подобное http://emqtt.io/docs/v2/config.html.
Я исправил пример кода в репо. Работает для меня.
Редактировать: только что проверил с Emq (docker run --rm -ti --name emq -p 18083:18083 -p 1883:1883 quodt/emq-docker:latest
) и работает нормально.
По сути, это cleanSession: должно быть ложным. Кроме того, состояния ожидания в ваших тестовых кодах плохие. Они слишком короткие на моей машине. Используйте защелку или другой реальный механизм синхронизации.