Запрос относительно Spring-Driven-Channel-Adapter
Я использую управляемый сообщениями-канал-адаптер Spring. Мой компонент потребляет сообщение от темы Tibco и публикации в тему RabbitMQ
Итак, поток сообщений выглядит следующим образом: Tibco-> (подписанный) Компонент (Опубликовано)-> RabbitMQ
Активатор службы показан ниже: как мы видим, есть канал ввода и канал вывода. Бин storeAndForwardActivator будет иметь бизнес-логику (в методе createIssueOfInterestOratorRecord)
<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />
У меня также есть сообщение = ведомый-канал-адаптер. Этот адаптер будет вызван до вызова сервисного адаптера.
<int-jms:message-driven-channel-adapter
id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
container="oratorIssueOfInterestmessageListenerContainer" />
то есть конкретно контейнер (показанный ниже) будет содержать имя темы для использования - это DefaultMessageListenerContainer
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
</bean>
Эта установка отлично работает. Однако в некоторых случаях мой потребитель / компонент получает сообщение "мошенник". т.е. пустая полезная нагрузка или тип сообщения HashMap (вместо простого TextMessage) - когда мы получаем это - что я наблюдаю - исключение перехватывается на уровне DefaultMessageListener (т.е. я не захожу так далеко, как мой бизнес-компонент, т.е. storeAndForwardActivator), из-за этого мой компонент не отправляет ACK обратно - и, поскольку это длительная тема, в теме есть сборка сообщений, что нежелательно. Есть ли способ для меня ACK сообщение сразу, независимо от погоды, исключение перехватывается на уровне DefaultMessageListener?
Или я должен ввести обработчик ошибок в DefaultMessageListener? Какой лучший способ справиться с этим, какие-либо предложения?
С уважением D
Обновить:
Я попытался добавить errorHandler в org.springframework.jms.listener.DefaultMessageListenerContainer, как показано ниже
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
<property name="errorHandler" ref="myErrorHandler"/>
</bean>
myErrorHandler - это bean-компонент как shpwn ниже
<bean id="myErrorHandler"
class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />
MyErroHandler реализует ErrorHandler
@Service
public class MyErrorHandler implements ErrorHandler{
private static Log log = LogFactory.getLog(MyErrorHandler.class);
@Override
public void handleError(Throwable t) {
if (t instanceof MessageHandlingException) {
MessageHandlingException exception = (MessageHandlingException) t;
if (exception != null) {
org.springframework.messaging.Message<?> message = exception.getFailedMessage();
Object payloadObject = message.getPayload();
if (null != payloadObject) {
log.info("Payload is not null, type is: " + payloadObject.getClass());
}
}
} else {
log.info("Exception is not of type: MessageHandlingException ");
}
}
}
Что я замечаю, так это то, что исключение перехватывается (когда подписчик использует мошенническое сообщение). Я продолжаю видеть этот журнал в цикле
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
то есть, поскольку транзакция не зафиксирована - одно и то же сообщение из длительной темы используется снова и снова. Моя цель - отправить ACK обратно брокеру после получения сообщения (независимо от того, поймали ли исключение или нет).
Завтра попробую канал ошибок.
С уважением D
1 ответ
Добавить error-channel
к адаптеру, управляемому сообщениями; ErrorMessage
будет содержать MessagingException
полезная нагрузка с двумя полями; cause
(исключение) и failedMessage
,
Если вы используете по умолчанию error-channel="errorChannel"
, исключение регистрируется.
Если вы хотите сделать больше, вы можете настроить свой собственный канал ошибок и добавить к нему поток.
РЕДАКТИРОВАТЬ:
В дополнение к вашим комментариям ниже...
payload must not be null
не является трассировкой стека; это сообщение.
Это сказало, payload must not be null
выглядит как сообщение Spring Integration; он, вероятно, добавляется в адаптер прослушивателя сообщений во время преобразования сообщения, а именно до того, как мы доберемся до точки, где сбой может перейти к error-channel
; такое исключение будет возвращено в контейнер.
Включите ведение журнала DEBUG и найдите эту запись в журнале:
logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");
Также предоставьте полную трассировку стека.
EDIT # 2
Итак, я воспроизвел вашу проблему, заставив преобразованную полезную нагрузку равной нулю в пользовательском MessageConverter
,
Обработчик ошибок DMLC вызывается контейнером после отката транзакции, поэтому нет способа остановить откат.
Мы можем добавить опцию к адаптеру, чтобы по-разному обрабатывать такие ошибки, но это потребует некоторой работы.
Тем временем, обходной путь должен был бы написать кастом MessageConverter
; что-то вроде того, что в этом Gist.
Затем вашему сервису придется иметь дело с обработкой полезных данных "Bad Message Received".
Затем вы предоставляете пользовательский конвертер, как это...
<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue" acknowledge="transacted"
message-converter="converter"
channel="jmsInChannel" />
<beans:bean id="converter" class="foo.MyMessageConverter" />