spring-gration-mqtt с несколькими серверами Mqtt для подписки

Я использую Spring-интеграции-mqtt Spring, и я могу подключиться к одному серверу Mqtt и могу получать сообщения по темам, подписанным, и теперь я хочу сделать приложение, которое может подключаться к нескольким серверам Mqtt и может получать данные от каждого подключения, и я хочу управлять им как динамическим, где я могу добавить больше серверов Mqtt из базы данных или текстового файла.

простой бин для одного подключения Mqtt для подписки выглядит следующим образом

@Bean
public MessageProducer inbound() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

Приведенный выше код создает соединение для сервера mqtt и может получать сообщения, и если я копирую, вставляю один и тот же код дважды для второго сервера с другим IP-адресом Mqtt, я могу подключиться к обоим серверам Mqtt следующим образом

@Bean
public MessageProducer inbound() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

@Bean
public MessageProducer inbound2() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

Приведенный выше код также работает нормально, и я могу получать сообщения от обоих Mqtt-серверов, но есть ли способ, которым я могу управлять им динамически, как показано ниже, я меняю тип возвращаемого объекта bean на list, но это не сработало:

  @Bean
  public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
      List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
      MqttPahoMessageDrivenChannelAdapter adapter2 =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter2.setCompletionTimeout(0);
      adapter2.setConverter(new DefaultPahoMessageConverter());
      adapter2.setQos(2);

      adapter2.setOutputChannel(mqttInputChannel() );

      MqttPahoMessageDrivenChannelAdapter adapter =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter.setCompletionTimeout(0);
      adapter.setConverter(new DefaultPahoMessageConverter());
      adapter.setQos(2);

      adapter.setOutputChannel(mqttInputChannel() );
      logConfList.add(adapter);
      logConfList.add(adapter2);

      return logConfList;

  }

Есть ли способ, которым я могу управлять этими bean-компонентами динамически, где я могу получить подробную информацию о mqtt-сервере из текстового файла и в цикле for или что-то еще, что я могу управлять несколькими соединениями.

1 ответ

Решение

См. Динамические и динамические интеграционные потоки.

@Autowired
private IntegrationFlowContext flowContext;

private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel,
        String... topics) {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics);
    // more adapter configuration
    IntegrationFlow flow = IntegrationFlows.from(adapter)
        .channel(channel)
        .get();
    return this.flowContext.registration(flow).register();
}

private void removeAdapter(IntegrationFlowRegistration flowReg) {
    this.flowContext.remove(flowReg.getId());
}
Другие вопросы по тегам