Динамическая генерация TCP-клиента с входящим каналом и ответным каналом

Я новичок в интеграции Spring.

Работа с Spring 4, только java аннотации.

В проекте, в котором я сейчас работаю, мы установили tcp-соединение в файле свойств.

На данный момент он жестко запрограммирован только на 2 разных соединения, и его необходимо заменить на более динамичный подход, где мы можем установить переменное количество их в файле свойств и иметь возможность добавлять новые во время выполнения.

Я знаю о существовании примера динамического tcp-клиента и пытался основывать свою работу на нем.

Сначала мы устанавливаем следующий компонент для соединения:

@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
  final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
  env.getProperty("socket.tcp.nodes[0].ip"), 
  env.getProperty("socket.tcp.nodes[0].port", Integer.class)
  );

  tcpNetClientConnectionFactory.setSingleUse(false);
  tcpNetClientConnectionFactory.setSoKeepAlive(true);

  final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);

  tcpNetClientConnectionFactory.setSerializer(by);
  tcpNetClientConnectionFactory.setDeserializer(by);
  return tcpNetClientConnectionFactory;
}

Тогда у нас есть адаптер, который ожидает что-то для отправки:

@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
        @Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
    final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
    adapter.setConnectionFactory(connectionFactory);
    adapter.setClientMode(true);
    adapter.setErrorChannelName("errorChannel");
    adapter.setRetryInterval(retryInterval);
    adapter.setOutputChannel(fromTcp());
    return adapter;
}

Когда вызывается fromTcp(), он преобразует сообщение, и следующий код отправляет его другому приложению для дальнейшей обработки.

@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
    sendToApi(inMessage, headerMap);
}

Когда сообщение обработано, мы должны отправить ответ.

@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
        final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
    final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
    tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
    return tcpSendingMsgHandler;
}

и используя шлюз:

@MessagingGateway()
public interface MessageTcpGateway {

  @Gateway(requestChannel = "toTcpCh01")
  ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}

мы отправим его обратно.

На примере я могу понять, как динамически создавать поток для ответа.

Но я не могу понять, как создать общий пул подключений, а затем создавать прослушивающие и ответные адаптеры на лету на основе этих соединений, а затем закрывать / удалять их во время выполнения.

Я несколько понимаю, как сделать поток с входящим адаптером благодаря этому вопросу

Нужно ли создавать несколько отдельных IntegrationFlow для каждого адаптера? поэтому все вызовы и ответы могут быть обработаны асинхронно (я могу ошибаться насчет асинхронности)

а затем обрабатывать их отдельно при желании закрыть соединение? например, вызов близко к TcpReceivingChannelAdapter, а затем к TcpSendingMessageHandler и, наконец, отмена регистрации соединения?

1 ответ

Решение

Я не думаю, что для совместной работы адаптеров каналов вам нужно IntegrationFlow определения для TcpReceivingChannelAdapter а также TcpSendingMessageHandler, Это действительно можно сделать как единый IntegrationFlow начиная с TcpReceivingChannelAdapter и заканчивая TcpSendingMessageHandler, Дело в том, что IntegrationFlow По сути, это просто логический контейнер для группировки ссылок на компоненты. Тяжелая работа действительно выполняется всеми теми компонентами, которые вы декларируете там, и с этим TcpReceivingChannelAdapter в TcpSendingMessageHandler и шлюз между вами действительно будет асинхронным.

Пожалуйста, имейте в виду, что ByteArrayLengthHeaderSerializer также должен быть объявлен как боб. Не уверен, что вам нужен отдельный экземпляр для каждого динамического потока, но вот API для этого в любом случае:

    /**
     * Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
     * application context. Usually it is some support component, which needs an application context.
     * For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
     * @param bean an additional arbitrary bean to register into the application context.
     * @return the current builder instance
     */
    IntegrationFlowRegistrationBuilder addBean(Object bean);
Другие вопросы по тегам