Проблемы с маршрутизацией сообщений на отдельные errorQueue

У меня есть MessageBean, который читает из очереди, которую мы назовем MainQ. Если при выполнении кода onMessage создается пользовательское исключение с типом, который мы назовем UserException, я хочу перехватить это и поместить это сообщение в отдельную очередь с именем UserErrorQ. Если исключение не относится к этому типу, исключение генерируется для обработки DMQ.

Вот моя проблема:

  • в моем блоке перехвата я пытаюсь через ErrorQueueHandler поместить это новое сообщение в UserErrorQ. Это приводит к ошибке при попытке подключиться к connectionFactory для отправки сообщения в UserErrorQ.
  • Очевидно, что создание нового соединения с QueueConnectionFactory(javax.jms.ConnectionFactory) вызывает проблемы

Ошибка:

com.sun.messaging.jms.JMSException: MQRA:DCF:allocation failure:createConnection:Error in allocating a connection. Cause: javax.transaction.RollbackException 
at com.sun.messaging.jms.ra.DirectConnectionFactory._allocateConnection(DirectConnectionFactory.java:548)
at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:265)
at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:244)`

MessageBean:

@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message message) {
   try{
    .
    .
   }catch(Exception e){
       if(isUserExceptionWrappedInException(e){
           errorQueueHandler.sendToErrorQueue(message);
       }
   }
}

private boolean isUserExceptionWrappedInException(Throwable t) {
    if (t == null)
        return false;
    else if (t instanceof UserException)
        return true;
    else
        return isUserExceptionWrappedInException(t.getCause());
}

ErrorQueueHandler:

public void sendToErrorQueue(Message message) {
    try {
        createConnection();
        send((TextMessage)message);
    } finally {
        closeConnection();
    }
}

private void createConnection() throws Exception {
    try {
        connection = connectionfactory.createConnection();
        connection.start();
    } catch (JMSException e) {
        String msg = "Error while attempting to initialize connection to jms destination " + ERROR_QUEUE;
        throw new OperationalException(msg, e, OperationalExceptionType.APPLIKASJONSTJENER);
    }
}

Как уже упоминалось, ошибка возникает при попытке установить соединение. У кого-нибудь есть решение для этого?

1 ответ

Решение

Итак, я разобрался с ответом на свой вопрос. Причиной исключения connectionException было то, что ErrorQueueHandler был не EJB, а скорее внедрен через CDI. В состоянии отката не разрешено создавать новые экземпляры, поскольку контейнер отбрасывает экземпляр компонента, поэтому он не выполнен. Моя аннотация REQUIRES_NEW также была проигнорирована, поскольку она относится к javax api, что не повлияет на компонент, внедренный в CDI.

Вот несколько вещей, на которые стоит обратить внимание:

  1. Убедитесь, что в EJB нет ни конструкторов, ни общедоступных. Модификаторы важны, так как контейнеру нужно, чтобы они были правильными для создания экземпляра EJB.

  2. Есть несколько проблем с этим подходом.

    • Поскольку я пытаюсь записать сообщение в отдельную очередь ошибок вместо DMQ, мне придется использовать это сообщение и не выдавать ошибку впоследствии. Поскольку MDB находится в состоянии отката, в спецификации JMS четко указано, что это приведет к повторной доставке сообщения. Что вы испытаете, так это то, что после написания вам пользовательского сообщения об ошибке Queue сообщение будет возвращено обратно в очередь, и теперь у вас будет бесконечный цикл.

К счастью, у меня также есть решение: главная проблема здесь - контроль ваших транзакций. Для этого сценария мне нужно 3 транзакции:

  1. Одна транзакция для MDB, чтобы он мог подтвердить событие сообщения, хотя у меня есть RuntimeException.
  2. Одна транзакция для логики метода onMessage, так что я могу выполнить откат при получении исключения, но при этом иметь возможность записи в ErrorQueue.
  3. Одна транзакция для подключения и записи в ErrorQueue в состоянии отката.

Код:

MessageBean:

@EJB
QueueService queueService;

@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message message) {
  try{
    queueService.processMessageInNewTrasaction(message);
  }catch(Exception e){
    throw e;
  }
}

QueueService:

import javax.jms.Message;
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
@Stateless
public class QueueService {

  @EJB
  ErrorQueueHandler errorQueueHandler;

  public void processMessageInNewTransaction(Message message){
    try {
    .
    .
    } catch(Exception e) {
      if(isUserExceptionWrappedInException(e)
        errorQueueHandler.sendToErrorQueue(message);
    }
  }

  private boolean isUserExceptionWrappedInException(Throwable t) {
    if (t == null)
      return false;
    else if (t instanceof UserException)
      return true;
    else
      return isUserExceptionWrappedInException(t.getCause());
  }

}

ErrorQueueHandler:

@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
@Stateless
public class ErrorQueueHandler{
   public void sendToErrorQueue(Message message){
   .
   .
   }
}

полезный ресурс: http://weblogic-wonders.com/weblogic/2011/01/10/working-with-jms-and-the-standard-issues-in-jms/

Другие вопросы по тегам