Можем ли мы группировать группы из 10 сообщений в комарах, используя весеннюю интеграцию
Это то, как я определил свое соединение mqtt с помощью весенней интеграции. Я не уверен, возможно ли это, но мы можем настроить работу подписчика mqtt после получения 10 сообщений. щас подписчик работает после публикации сообщения как надо.
@Autowired
ConnectorConfig config;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(config.getUrl());
factory.setUserName(config.getUser());
factory.setPassword(config.getPass());
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttRouterChannel());
return adapter;
}
/**this is router**/
@MessageEndpoint
public class MessageRouter {
private final Logger logger = LoggerFactory.getLogger(MessageRouter.class);
static final String ALERT = "ALERT";
static final String READING = "READING";
@Router(inputChannel = "mqttRouterChannel")
public String route(@Header("mqtt_topic") String topic){
String route = null;
switch (topic){
case ALERT:
logger.info("alert message received");
route = "alertTransformerChannel";
break;
case READING:
logger.info("reading message received");
route = "readingTransformerChannel";
break;
}
return route;
}
}
1 ответ
мне нужно группировать группы из 10 сообщений одновременно
Это не MqttPahoMessageDrivenChannelAdapter
обязанность.
Мы используем там MqttCallback
с этой семантикой:
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception if a terminal error has occurred, and the client should be
* shut down.
*/
public void messageArrived(String topic, MqttMessage message) throws Exception;
Поэтому мы не можем пакетировать их там на этом канальном адаптере по природе клиента Paho.
То, что мы можем предложить вам с точки зрения Spring Integration, - это реализация EIP Aggregator.
В вашем случае вы должны добавить @ServiceActivator
для AggregatorFactoryBean
@Bean
до этого mqttRouterChannel
перед отправкой в роутер.
Это может быть так просто, как:
@Bean
@ServiceActivator(inputChannel = "mqttAggregatorChannel")
AggregatorFactoryBean mqttAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregator.setCorrelationStrategy(m -> 1);
aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10));
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000));
aggregator.setOutputChannelName("mqttRouterChannel");
return aggregator;
}
См. Дополнительную информацию в Справочном руководстве.