При получении сообщения MessageDeliveryException: у Диспетчера нет подписчиков
ОБНОВЛЕНИЕ: После перехода от класса Tcp() к классу держателей к классу конечных точек все снова заработало, теперь я озадачен, почему это работает даже сейчас, потому что для меня это не имеет смысла, почему это работает
После некоторых головных болей и поиска во всем стековом потоке у меня есть диспетчер, у которого нет подписчика, когда я получаю сообщение от сервера, но не когда я отправляю ответ, где все работает нормально.
генератор подключения:
@Service
@EnableIntegration
public class TcpConnectionsHolder {
@Autowired
private IntegrationFlowContext flowContext;
/**
* Definition of flow channels
*
* @return MessageChannel
*/
@Bean
public MessageChannel fromTcp() {
final DirectChannel channel = new DirectChannel();
channel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
// Parse Message byte[] to StringHex
final byte[] bMessagePayload = (byte[]) message.getPayload();
return MessageBuilder.withPayload(Hex.encodeHexString(bMessagePayload))
.copyHeaders(message.getHeaders()).build();
}
});
return channel;
}
private final LinkedHashMap<String, TcpNetClientConnectionFactory> clientConnect =
new LinkedHashMap<String, TcpNetClientConnectionFactory>();
private final LinkedHashMap<String, TcpReceivingChannelAdapter> reciverAdapter =
new LinkedHashMap<String, TcpReceivingChannelAdapter>();
private final LinkedHashMap<String, MessageChannel> sendingAdpater =
new LinkedHashMap<String, MessageChannel>();
public MessageChannel getMessageChannel(String host, int port) {
return sendingAdpater.get(host+port);
}
public TcpReceivingChannelAdapter getReceiverChannel(String host, int port) {
return reciverAdapter.get(host+port);
}
private TcpNetClientConnectionFactory getclientConnectionFactory(String host, int port,int headBytes) {
TcpNetClientConnectionFactory cf = clientConnect.get(host+port);
if(cf==null) {
cf = new TcpNetClientConnectionFactory(host, port);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(by);
cf.setDeserializer(by);
clientConnect.put(host+port,cf);
}
return cf;
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port) {
return addReceiverChannel(host,port,2,2000);
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port,int headBytes,int retryInterval) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
adapter.setConnectionFactory(cf);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
IntegrationFlow flow = IntegrationFlows.from(adapter).get();
this.flowContext.registration(flow).id(host+port + ".in").addBean(cf).register();
this.reciverAdapter.put(host+port, adapter);
return adapter;
}
public MessageChannel addSendingChannel(String host, int port) {
return addSendingChannel(host,port,2);
}
public MessageChannel addSendingChannel(String host, int port,int headBytes) {
TcpSendingMessageHandler sender = new TcpSendingMessageHandler();
sender.setConnectionFactory(getclientConnectionFactory(host,port,headBytes));
IntegrationFlow flow = f -> f.handle(sender);
IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow).id(host+port + ".out").register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.sendingAdpater.put(host+port, inputChannel);
return inputChannel;
}
public void removeReceiverChannel(String host, int port) {
this.reciverAdapter.remove(host+port);
this.flowContext.remove(host+port + ".in");
}
public void removeSendingChannel(String host, int port) {
this.sendingAdpater.remove(host+port);
this.flowContext.remove(host+port + ".out");
}
}
конечная точка сообщения:
@Configuration
@MessageEndpoint
public class BridgeMessageEndpoint {
private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMessageEndpoint.class);
@Autowired
private ApplicationContext applicationContext;
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
private void sendToApi(final String inMessage, final Map<String, Object> headerMap) {
LOGGER.debug("Recuperando el mensaje Hex {}", inMessage);
final PaymentOrder paymentOrder = new PaymentOrder();
paymentOrder.setMessage(inMessage);
final SplitterRestClient splitterRestClient = applicationContext.getBean(SplitterRestClient.class);
splitterRestClient.reportPayment(paymentOrder, headerMap);
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler logger() {
final LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.DEBUG.name());
loggingHandler.setLoggerName("Log");
return loggingHandler;
}
@Bean
public IntegrationFlow toTcp() {
return f -> f.route(new TcpRouter());
}
}
а затем инициация держателя:
@Component
public class TcpConnectionsController implements CommandLineRunner{
@Autowired
private TcpConnectionsHolder holder;
@Autowired
private ListNodeConfig listNodes;
@Value("${socket.tcp.headBytes}")
private int headBytes;
@Value("${socket.tcp.retryInterval}")
private int retryInterval;
@Override
public void run(String... args) throws Exception {
for(Node node:listNodes.getNodes()) {
holder.addReceiverChannel(node.getIp(), node.getPort(),headBytes,retryInterval);
holder.addSendingChannel(node.getIp(), node.getPort(),headBytes);
}
}
}
Я знаю, что проблема в fromTcp(), но, пока я ищу, я не могу увидеть, где находится проблема или ошибка.
РЕДАКТИРОВАТЬ Полная трассировка стека:
[INFO ] (DefaultLifecycleProcessor.java:343) org.springframework.context.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2018/06/22 13:56:47,454 [INFO ] (DocumentationPluginsBootstrapper.java:151) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Context refreshed
2018/06/22 13:56:47,504 [INFO ] (DocumentationPluginsBootstrapper.java:154) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2018/06/22 13:56:47,555 [INFO ] (ApiListingReferenceScanner.java:41) springfox.documentation.spring.web.scanners.ApiListingReferenceScanner : Scanning for api listing references
2018/06/22 13:56:47,826 [INFO ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Initializing ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,827 [INFO ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Starting ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,829 [INFO ] (DirectJDKLog.java:180) org.apache.tomcat.util.net.NioSelectorPool : Using a shared selector for servlet write/read
2018/06/22 13:56:47,836 [INFO ] (TomcatEmbeddedServletContainer.java:201) org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2018/06/22 13:56:47,902 [INFO ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,922 [INFO ] (AbstractEndpoint.java:120) org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#0
2018/06/22 13:56:47,949 [INFO ] (EventDrivenConsumer.java:108) org.springframework.integration.endpoint.EventDrivenConsumer : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the '180.112.19.1153115.out.input' channel
2018/06/22 13:56:47,950 [INFO ] (AbstractSubscribableChannel.java:81) org.springframework.integration.channel.DirectChannel : Channel 'ck-da-bridge:local:8080.180.112.19.1153115.out.input' has 1 subscriber(s).
2018/06/22 13:56:47,950 [INFO ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,950 [INFO ] (AbstractEndpoint.java:120) org.springframework.integration.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2018/06/22 13:56:47,953 [INFO ] (StartupInfoLogger.java:57) com.santander.ck.bridge.spring.Application : Started Application in 13.078 seconds (JVM running for 14.159)
2018/06/22 13:57:01,164 [ERROR ] (LoggingHandler.java:192) org.springframework.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'ck-da-bridge:local:8080.180.112.19.1153115.in.channel#0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}], failedMessage=GenericMessage [payload=f0f000022f0f6e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:88)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
... 11 more
1 ответ
Вы можете включить уровень ведения журнала DEBUG для org.springframework.integration
категория и проследить, как проходит сообщение.
Также существует шаблон истории сообщений, позволяющий отслеживать путь сообщения в заголовках. Таким образом, вы сможете увидеть, какие каналы прошли ваши сообщения и где они застряли с Dispatcher has no subscribers
,
Хотя вам нужно будет перезагрузить MessageHistoryConfigurer
после каждого IntegrationFlow
постановка на учет.
Не стесняйтесь поднимать JIRA по этому вопросу, чтобы позволить истории сообщений отслеживать динамические потоки.
ОБНОВИТЬ
ХОРОШО. Имея ваш простой код вроде этого:
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
adapter.setConnectionFactory(cf);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
IntegrationFlow flow = IntegrationFlows.from(adapter).get();
И тот факт, что больше нет компонентов после этого handler
(трассировка стека adn действительно подтверждает это), я предполагаю, что вы используете некоторую версию Spring Integration Java DSL без надлежащего исправления для извлечения outputChannel
из предоставленного MessageProducer.getOutputChannel()
,
Чтобы исправить это, я предлагаю вам изменить код следующим образом:
IntegrationFlow flow = IntegrationFlows.from(adapter).channel(fromTcp()).get();
Я имею в виду перенести ссылку на канал из TcpReceivingChannelAdapter
определение потока. Или просто попробуйте обновить до последней 1.2.3
версия для spring-integration-java-dsl
зависимость!
ОБНОВЛЕНИЕ 2
Еще одна мысль о коде у нас пока.
Вы регистрируете TcpReceivingChannelAdapter
от CommandLineRunner
, Таким образом, IntegrationFlow
запускается автоматически и готов к приему данных через TCP, но в то же время @ServiceActivator(inputChannel = "fromTcp")
пока не запущен прием сообщений с указанного канала. Вот как вы это получаете Dispatcher has no subscribers
, Или вы должны испортить свой this.flowContext.registration(flow)
с autoStartup(false)
и позвольте ему запускаться в последнее время во время нормальной фазы автозапуска, уже вместе с активатором службы. Или вы не должны использовать CommandLineRunner
подход к такой логике: слишком рано начинать какую-то деятельность.