Подписка СИ на несколько тем mqtt

Я пытаюсь научиться обрабатывать сообщения MQTT в Spring-Integration. Создал конвертер, который подписывается с помощью одного MqttPahoMessageDrivenChannelAdapter на тему MQTT для использования и преобразования сообщений.

Проблема в том, что наш провайдер данных планирует ускорить публикацию сообщений на его стороне. Таким образом, вместо нескольких (<=10) тем, в каждой из которых есть сообщения с примерно 150 полями, планируется опубликовать каждое из этих полей в отдельной теме MQTT.

Это означает, что мой конвертер должен был бы потреблять ок. 1000 mqtt тем, но я не знаю,

  1. Весенняя интеграция все еще хороший выбор для этого. Причины афаик. упомянутый адаптер использует PAHO MqttClient, который будет принимать сообщения от всех тем, на которые он подписан, в одном потоке, и создание 1000 экземпляров этих адаптеров является излишним.
  2. Если мы продолжим интеграцию Spring и будем использовать предоставленные компоненты, было бы неплохо создать единый входящий адаптер для всех полей, которые ранее были в сообщениях по одной теме, но перенести преобразование из компонента адаптера в отдельный bean-компонент (который выполняет преобразование), связанный с каналом-исполнителем с адаптером и, таким образом, выполняющий преобразование этих полей в некотором пуле потоков параллельно.

Заранее спасибо за ваши ответы!

1 ответ

Решение

Я думаю, что ваша идея имеет смысл.

Для этого вам нужно осуществить сквозной переход MqttMessageConverter и предоставить MqttMessage как payload а также topic в качестве заголовка:

public class PassThroughMqttMessageConverter implements MqttMessageConverter {

    @Override
    public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
        return MessageBuilder.withPayload(mqttMessage)
                .setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
                .build();
    }

    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        return null;
    }

    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        return null;
    }

}

Таким образом, вы действительно сможете выполнить целевое преобразование вниз по течению, после указанного ExecutorChannel в обычае transformer,

Вы также можете рассмотреть возможность реализации пользовательских MqttPahoClientFactory (продолжение DefaultMqttPahoClientFactory может работать так же) и предоставить кастом ScheduledExecutorService впрыскивать в MqttClient вы собираетесь создать в getClientInstance(),

Другие вопросы по тегам