Spring Integration: Исключение Splitter вызывает прерывание последующих сообщений
У меня есть следующая конфигурация, где я пытаюсь обработать список сообщений, которые разделены разделителем. Проблема, с которой я сталкиваюсь, состоит в том, что исключение в одной из отдельных обработок сообщений приводит к тому, что все последующие сообщения НЕ обрабатываются. Что-то я делаю не так?
<int:chain input-channel="exceptionTestChannel">
<int:splitter/>
<int:header-enricher>
<int:error-channel ref="myErrorChannel"/>
</int:header-enricher>
<int:service-activator id="testExceptionService"
ref="testExceptionService" method="throwException"></int:service-activator>
<int:aggregator></int:aggregator>
</int:chain>
<int:channel id="myErrorChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="exceptionTestChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:transformer ref="errorUnwrapper" input-channel="myErrorChannel" />
Исключение ниже
2014-11-12 09:56:30,076 ERROR [com.test.transform.ErrorUnwrapper] - [Payload=org.springframework.integration.MessageHandlingException: com.test.TestServiceException: System Error. Trial test exception][Headers={timestamp=1415814990076, id=a2f50a6d-4de6-3785-e8d8-a5ef9f46c27f}]
org.springframework.integration.MessageHandlingException: com.test.TestServiceException: System Error. Trial test exception
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:67)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:167)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:178)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:149)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:97)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:199)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:452)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:314)
at java.util.concurrent.FutureTask.run(FutureTask.java:149)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:109)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:218)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:883)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:919)
at java.lang.Thread.run(Thread.java:736)
Caused by: com.test.TestServiceException: System Error. Trial test exception
at com.test.TestServiceException.getSystemErrorInstance(TestServiceException.java:31)
at com.test.TestExceptionService.throwException(TestExceptionService.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
at java.lang.reflect.Method.invoke(Method.java:611)
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:122)
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:44)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:82)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:103)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:144)
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:268)
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
... 50 more
2014-11-12 09:56:30,076 WARN [org.springframework.integration.channel.MessagePublishingErrorHandler] - Error message was not delivered.
org.springframework.integration.support.channel.ChannelResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:218)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:178)
at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:83)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:452)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:314)
at java.util.concurrent.FutureTask.run(FutureTask.java:149)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:109)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:218)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:883)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:919)
at java.lang.Thread.run(Thread.java:736)
2 ответа
исключение в одной из отдельных обработок сообщений приводит к тому, что все последующие сообщения НЕ обрабатываются
Вы должны согласиться с тем, что это нормальное поведение для любого однопоточного процесса: когда где-то вызывается исключение, весь другой код вызываться не будет.
Простой split
похоже for (Object o : collection)
,
Для достижения ваших требований вы должны использовать ExecutorChannel
(или же QueueChannel
) как output-channel
за <splitter>
,
Конечно, в этом случае вы должны потянуть splitter
вне <chain>
,
С этим изменением потоков ваши сообщения не будут влиять друг на друга.
@Artem верен, но другой альтернативой, если вы хотите, чтобы все выполнялось в одном потоке, является вставка промежуточного потока шлюза.
Просто добавить заголовок канала с ошибками, как это не будет работать...
<chain>
<splitter />
<service-activator ref="gw" />
<chain>
<gateway id="gw" request-channel="foo" error-channel="errors"
reply-timeout="0" />
<logging-channel-adapter channel="errors" />
<chain input-channel="foo">
<int:service-activator id="testExceptionService"
ref="testExceptionService" method="throwException"></int:service-activator>
<int:aggregator></int:aggregator>
</chain>
Тем не менее, удаление таких разделений приведет к тому, что агрегатор по умолчанию никогда не завершит группу, поэтому вам нужна специальная стратегия выпуска, которая будет освобождать группу даже при отсутствии разделений.