spring-gration-mqtt с несколькими серверами Mqtt для подписки
Я использую Spring-интеграции-mqtt Spring, и я могу подключиться к одному серверу Mqtt и могу получать сообщения по темам, подписанным, и теперь я хочу сделать приложение, которое может подключаться к нескольким серверам Mqtt и может получать данные от каждого подключения, и я хочу управлять им как динамическим, где я могу добавить больше серверов Mqtt из базы данных или текстового файла.
простой бин для одного подключения Mqtt для подписки выглядит следующим образом
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
Приведенный выше код создает соединение для сервера mqtt и может получать сообщения, и если я копирую, вставляю один и тот же код дважды для второго сервера с другим IP-адресом Mqtt, я могу подключиться к обоим серверам Mqtt следующим образом
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
@Bean
public MessageProducer inbound2() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
Приведенный выше код также работает нормально, и я могу получать сообщения от обоих Mqtt-серверов, но есть ли способ, которым я могу управлять им динамически, как показано ниже, я меняю тип возвращаемого объекта bean на list, но это не сработало:
@Bean
public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel() );
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter.setCompletionTimeout(0);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel() );
logConfList.add(adapter);
logConfList.add(adapter2);
return logConfList;
}
Есть ли способ, которым я могу управлять этими bean-компонентами динамически, где я могу получить подробную информацию о mqtt-сервере из текстового файла и в цикле for или что-то еще, что я могу управлять несколькими соединениями.
1 ответ
См. Динамические и динамические интеграционные потоки.
@Autowired
private IntegrationFlowContext flowContext;
private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel,
String... topics) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics);
// more adapter configuration
IntegrationFlow flow = IntegrationFlows.from(adapter)
.channel(channel)
.get();
return this.flowContext.registration(flow).register();
}
private void removeAdapter(IntegrationFlowRegistration flowReg) {
this.flowContext.remove(flowReg.getId());
}