Подписка СИ на несколько тем mqtt
Я пытаюсь научиться обрабатывать сообщения MQTT в Spring-Integration. Создал конвертер, который подписывается с помощью одного MqttPahoMessageDrivenChannelAdapter на тему MQTT для использования и преобразования сообщений.
Проблема в том, что наш провайдер данных планирует ускорить публикацию сообщений на его стороне. Таким образом, вместо нескольких (<=10) тем, в каждой из которых есть сообщения с примерно 150 полями, планируется опубликовать каждое из этих полей в отдельной теме MQTT.
Это означает, что мой конвертер должен был бы потреблять ок. 1000 mqtt тем, но я не знаю,
- Весенняя интеграция все еще хороший выбор для этого. Причины афаик. упомянутый адаптер использует PAHO MqttClient, который будет принимать сообщения от всех тем, на которые он подписан, в одном потоке, и создание 1000 экземпляров этих адаптеров является излишним.
- Если мы продолжим интеграцию Spring и будем использовать предоставленные компоненты, было бы неплохо создать единый входящий адаптер для всех полей, которые ранее были в сообщениях по одной теме, но перенести преобразование из компонента адаптера в отдельный bean-компонент (который выполняет преобразование), связанный с каналом-исполнителем с адаптером и, таким образом, выполняющий преобразование этих полей в некотором пуле потоков параллельно.
Заранее спасибо за ваши ответы!
1 ответ
Я думаю, что ваша идея имеет смысл.
Для этого вам нужно осуществить сквозной переход MqttMessageConverter
и предоставить MqttMessage
как payload
а также topic
в качестве заголовка:
public class PassThroughMqttMessageConverter implements MqttMessageConverter {
@Override
public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
return MessageBuilder.withPayload(mqttMessage)
.setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
.build();
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return null;
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return null;
}
}
Таким образом, вы действительно сможете выполнить целевое преобразование вниз по течению, после указанного ExecutorChannel
в обычае transformer
,
Вы также можете рассмотреть возможность реализации пользовательских MqttPahoClientFactory
(продолжение DefaultMqttPahoClientFactory
может работать так же) и предоставить кастом ScheduledExecutorService
впрыскивать в MqttClient
вы собираетесь создать в getClientInstance()
,