Можем ли мы группировать группы из 10 сообщений в комарах, используя весеннюю интеграцию

Это то, как я определил свое соединение mqtt с помощью весенней интеграции. Я не уверен, возможно ли это, но мы можем настроить работу подписчика mqtt после получения 10 сообщений. щас подписчик работает после публикации сообщения как надо.

    @Autowired
    ConnectorConfig config;


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(config.getUrl());
        factory.setUserName(config.getUser());
        factory.setPassword(config.getPass());
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING");

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttRouterChannel());
        return adapter;
    }

   /**this is router**/
   @MessageEndpoint
   public class MessageRouter {

    private final Logger logger = LoggerFactory.getLogger(MessageRouter.class);


    static final String  ALERT = "ALERT";
    static final String  READING = "READING";

    @Router(inputChannel = "mqttRouterChannel")
    public String route(@Header("mqtt_topic") String topic){
        String route = null;
        switch (topic){
            case ALERT:
                logger.info("alert message received");
                route = "alertTransformerChannel";
                break;
            case READING:
                logger.info("reading message received");
                route = "readingTransformerChannel";
                break;
        }
        return route;
    }
 }

1 ответ

Решение

мне нужно группировать группы из 10 сообщений одновременно

Это не MqttPahoMessageDrivenChannelAdapter обязанность.

Мы используем там MqttCallback с этой семантикой:

 * @param topic name of the topic on the message was published to
 * @param message the actual message.
 * @throws Exception if a terminal error has occurred, and the client should be
 * shut down.
 */
public void messageArrived(String topic, MqttMessage message) throws Exception;

Поэтому мы не можем пакетировать их там на этом канальном адаптере по природе клиента Paho.

То, что мы можем предложить вам с точки зрения Spring Integration, - это реализация EIP Aggregator.

В вашем случае вы должны добавить @ServiceActivator для AggregatorFactoryBean@Bean до этого mqttRouterChannelперед отправкой в ​​роутер.

Это может быть так просто, как:

@Bean
@ServiceActivator(inputChannel = "mqttAggregatorChannel")
AggregatorFactoryBean mqttAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
    aggregator.setCorrelationStrategy(m -> 1);
    aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10));
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setSendPartialResultOnExpiry(true);
    aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000));
    aggregator.setOutputChannelName("mqttRouterChannel");
    return aggregator;
}

См. Дополнительную информацию в Справочном руководстве.

Другие вопросы по тегам