Запрос относительно Spring-Driven-Channel-Adapter

Я использую управляемый сообщениями-канал-адаптер Spring. Мой компонент потребляет сообщение от темы Tibco и публикации в тему RabbitMQ

Итак, поток сообщений выглядит следующим образом: Tibco-> (подписанный) Компонент (Опубликовано)-> RabbitMQ

Активатор службы показан ниже: как мы видим, есть канал ввода и канал вывода. Бин storeAndForwardActivator будет иметь бизнес-логику (в методе createIssueOfInterestOratorRecord)

<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
    ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
    output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />

У меня также есть сообщение = ведомый-канал-адаптер. Этот адаптер будет вызван до вызова сервисного адаптера.

<int-jms:message-driven-channel-adapter
    id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
    container="oratorIssueOfInterestmessageListenerContainer" />

то есть конкретно контейнер (показанный ниже) будет содержать имя темы для использования - это DefaultMessageListenerContainer

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />
</bean>

Эта установка отлично работает. Однако в некоторых случаях мой потребитель / компонент получает сообщение "мошенник". т.е. пустая полезная нагрузка или тип сообщения HashMap (вместо простого TextMessage) - когда мы получаем это - что я наблюдаю - исключение перехватывается на уровне DefaultMessageListener (т.е. я не захожу так далеко, как мой бизнес-компонент, т.е. storeAndForwardActivator), из-за этого мой компонент не отправляет ACK обратно - и, поскольку это длительная тема, в теме есть сборка сообщений, что нежелательно. Есть ли способ для меня ACK сообщение сразу, независимо от погоды, исключение перехватывается на уровне DefaultMessageListener?

Или я должен ввести обработчик ошибок в DefaultMessageListener? Какой лучший способ справиться с этим, какие-либо предложения?

С уважением D

Обновить:

Я попытался добавить errorHandler в org.springframework.jms.listener.DefaultMessageListenerContainer, как показано ниже

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />

    <property name="errorHandler" ref="myErrorHandler"/>
</bean>

myErrorHandler - это bean-компонент как shpwn ниже

<bean id="myErrorHandler"
    class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />

MyErroHandler реализует ErrorHandler

 @Service
 public class MyErrorHandler implements ErrorHandler{

private static Log log = LogFactory.getLog(MyErrorHandler.class);

@Override
   public void handleError(Throwable t) {

        if (t instanceof MessageHandlingException) {
            MessageHandlingException exception = (MessageHandlingException) t;
            if (exception != null) {
                org.springframework.messaging.Message<?> message = exception.getFailedMessage();
                Object payloadObject = message.getPayload();
                if (null != payloadObject) {
                    log.info("Payload  is not null, type is: " + payloadObject.getClass());
                }
            }
        } else {
            log.info("Exception is not of type: MessageHandlingException ");
        }
}

}

Что я замечаю, так это то, что исключение перехватывается (когда подписчик использует мошенническое сообщение). Я продолжаю видеть этот журнал в цикле

    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 

то есть, поскольку транзакция не зафиксирована - одно и то же сообщение из длительной темы используется снова и снова. Моя цель - отправить ACK обратно брокеру после получения сообщения (независимо от того, поймали ли исключение или нет).

Завтра попробую канал ошибок.

С уважением D

1 ответ

Решение

Добавить error-channel к адаптеру, управляемому сообщениями; ErrorMessage будет содержать MessagingException полезная нагрузка с двумя полями; cause (исключение) и failedMessage,

Если вы используете по умолчанию error-channel="errorChannel", исключение регистрируется.

Если вы хотите сделать больше, вы можете настроить свой собственный канал ошибок и добавить к нему поток.

РЕДАКТИРОВАТЬ:

В дополнение к вашим комментариям ниже...

payload must not be null не является трассировкой стека; это сообщение.

Это сказало, payload must not be null выглядит как сообщение Spring Integration; он, вероятно, добавляется в адаптер прослушивателя сообщений во время преобразования сообщения, а именно до того, как мы доберемся до точки, где сбой может перейти к error-channel; такое исключение будет возвращено в контейнер.

Включите ведение журнала DEBUG и найдите эту запись в журнале:

logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");

Также предоставьте полную трассировку стека.

EDIT # 2

Итак, я воспроизвел вашу проблему, заставив преобразованную полезную нагрузку равной нулю в пользовательском MessageConverter,

Обработчик ошибок DMLC вызывается контейнером после отката транзакции, поэтому нет способа остановить откат.

Мы можем добавить опцию к адаптеру, чтобы по-разному обрабатывать такие ошибки, но это потребует некоторой работы.

Тем временем, обходной путь должен был бы написать кастом MessageConverter; что-то вроде того, что в этом Gist.

Затем вашему сервису придется иметь дело с обработкой полезных данных "Bad Message Received".

Затем вы предоставляете пользовательский конвертер, как это...

<jms:message-driven-channel-adapter id="jmsIn"
        destination="requestQueue" acknowledge="transacted"
        message-converter="converter"
        channel="jmsInChannel" />

<beans:bean id="converter" class="foo.MyMessageConverter" />
Другие вопросы по тегам