Динамическая генерация TCP-клиента с входящим каналом и ответным каналом
Я новичок в интеграции Spring.
Работа с Spring 4, только java аннотации.
В проекте, в котором я сейчас работаю, мы установили tcp-соединение в файле свойств.
На данный момент он жестко запрограммирован только на 2 разных соединения, и его необходимо заменить на более динамичный подход, где мы можем установить переменное количество их в файле свойств и иметь возможность добавлять новые во время выполнения.
Я знаю о существовании примера динамического tcp-клиента и пытался основывать свою работу на нем.
Сначала мы устанавливаем следующий компонент для соединения:
@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
env.getProperty("socket.tcp.nodes[0].ip"),
env.getProperty("socket.tcp.nodes[0].port", Integer.class)
);
tcpNetClientConnectionFactory.setSingleUse(false);
tcpNetClientConnectionFactory.setSoKeepAlive(true);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
tcpNetClientConnectionFactory.setSerializer(by);
tcpNetClientConnectionFactory.setDeserializer(by);
return tcpNetClientConnectionFactory;
}
Тогда у нас есть адаптер, который ожидает что-то для отправки:
@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
@Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
return adapter;
}
Когда вызывается fromTcp(), он преобразует сообщение, и следующий код отправляет его другому приложению для дальнейшей обработки.
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
Когда сообщение обработано, мы должны отправить ответ.
@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
return tcpSendingMsgHandler;
}
и используя шлюз:
@MessagingGateway()
public interface MessageTcpGateway {
@Gateway(requestChannel = "toTcpCh01")
ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}
мы отправим его обратно.
На примере я могу понять, как динамически создавать поток для ответа.
Но я не могу понять, как создать общий пул подключений, а затем создавать прослушивающие и ответные адаптеры на лету на основе этих соединений, а затем закрывать / удалять их во время выполнения.
Я несколько понимаю, как сделать поток с входящим адаптером благодаря этому вопросу
Нужно ли создавать несколько отдельных IntegrationFlow для каждого адаптера? поэтому все вызовы и ответы могут быть обработаны асинхронно (я могу ошибаться насчет асинхронности)
а затем обрабатывать их отдельно при желании закрыть соединение? например, вызов близко к TcpReceivingChannelAdapter, а затем к TcpSendingMessageHandler и, наконец, отмена регистрации соединения?
1 ответ
Я не думаю, что для совместной работы адаптеров каналов вам нужно IntegrationFlow
определения для TcpReceivingChannelAdapter
а также TcpSendingMessageHandler
, Это действительно можно сделать как единый IntegrationFlow
начиная с TcpReceivingChannelAdapter
и заканчивая TcpSendingMessageHandler
, Дело в том, что IntegrationFlow
По сути, это просто логический контейнер для группировки ссылок на компоненты. Тяжелая работа действительно выполняется всеми теми компонентами, которые вы декларируете там, и с этим TcpReceivingChannelAdapter
в TcpSendingMessageHandler
и шлюз между вами действительно будет асинхронным.
Пожалуйста, имейте в виду, что ByteArrayLengthHeaderSerializer
также должен быть объявлен как боб. Не уверен, что вам нужен отдельный экземпляр для каждого динамического потока, но вот API для этого в любом случае:
/**
* Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
* application context. Usually it is some support component, which needs an application context.
* For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
* @param bean an additional arbitrary bean to register into the application context.
* @return the current builder instance
*/
IntegrationFlowRegistrationBuilder addBean(Object bean);