Проблемы с маршрутизацией сообщений на отдельные errorQueue
У меня есть MessageBean, который читает из очереди, которую мы назовем MainQ. Если при выполнении кода onMessage создается пользовательское исключение с типом, который мы назовем UserException, я хочу перехватить это и поместить это сообщение в отдельную очередь с именем UserErrorQ. Если исключение не относится к этому типу, исключение генерируется для обработки DMQ.
Вот моя проблема:
- в моем блоке перехвата я пытаюсь через ErrorQueueHandler поместить это новое сообщение в UserErrorQ. Это приводит к ошибке при попытке подключиться к connectionFactory для отправки сообщения в UserErrorQ.
- Очевидно, что создание нового соединения с QueueConnectionFactory(javax.jms.ConnectionFactory) вызывает проблемы
Ошибка:
com.sun.messaging.jms.JMSException: MQRA:DCF:allocation failure:createConnection:Error in allocating a connection. Cause: javax.transaction.RollbackException
at com.sun.messaging.jms.ra.DirectConnectionFactory._allocateConnection(DirectConnectionFactory.java:548)
at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:265)
at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:244)`
MessageBean:
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message message) {
try{
.
.
}catch(Exception e){
if(isUserExceptionWrappedInException(e){
errorQueueHandler.sendToErrorQueue(message);
}
}
}
private boolean isUserExceptionWrappedInException(Throwable t) {
if (t == null)
return false;
else if (t instanceof UserException)
return true;
else
return isUserExceptionWrappedInException(t.getCause());
}
ErrorQueueHandler:
public void sendToErrorQueue(Message message) {
try {
createConnection();
send((TextMessage)message);
} finally {
closeConnection();
}
}
private void createConnection() throws Exception {
try {
connection = connectionfactory.createConnection();
connection.start();
} catch (JMSException e) {
String msg = "Error while attempting to initialize connection to jms destination " + ERROR_QUEUE;
throw new OperationalException(msg, e, OperationalExceptionType.APPLIKASJONSTJENER);
}
}
Как уже упоминалось, ошибка возникает при попытке установить соединение. У кого-нибудь есть решение для этого?
1 ответ
Итак, я разобрался с ответом на свой вопрос. Причиной исключения connectionException было то, что ErrorQueueHandler был не EJB, а скорее внедрен через CDI. В состоянии отката не разрешено создавать новые экземпляры, поскольку контейнер отбрасывает экземпляр компонента, поэтому он не выполнен. Моя аннотация REQUIRES_NEW также была проигнорирована, поскольку она относится к javax api, что не повлияет на компонент, внедренный в CDI.
Вот несколько вещей, на которые стоит обратить внимание:
Убедитесь, что в EJB нет ни конструкторов, ни общедоступных. Модификаторы важны, так как контейнеру нужно, чтобы они были правильными для создания экземпляра EJB.
Есть несколько проблем с этим подходом.
- Поскольку я пытаюсь записать сообщение в отдельную очередь ошибок вместо DMQ, мне придется использовать это сообщение и не выдавать ошибку впоследствии. Поскольку MDB находится в состоянии отката, в спецификации JMS четко указано, что это приведет к повторной доставке сообщения. Что вы испытаете, так это то, что после написания вам пользовательского сообщения об ошибке Queue сообщение будет возвращено обратно в очередь, и теперь у вас будет бесконечный цикл.
К счастью, у меня также есть решение: главная проблема здесь - контроль ваших транзакций. Для этого сценария мне нужно 3 транзакции:
- Одна транзакция для MDB, чтобы он мог подтвердить событие сообщения, хотя у меня есть RuntimeException.
- Одна транзакция для логики метода onMessage, так что я могу выполнить откат при получении исключения, но при этом иметь возможность записи в ErrorQueue.
- Одна транзакция для подключения и записи в ErrorQueue в состоянии отката.
Код:
MessageBean:
@EJB
QueueService queueService;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message message) {
try{
queueService.processMessageInNewTrasaction(message);
}catch(Exception e){
throw e;
}
}
QueueService:
import javax.jms.Message;
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
@Stateless
public class QueueService {
@EJB
ErrorQueueHandler errorQueueHandler;
public void processMessageInNewTransaction(Message message){
try {
.
.
} catch(Exception e) {
if(isUserExceptionWrappedInException(e)
errorQueueHandler.sendToErrorQueue(message);
}
}
private boolean isUserExceptionWrappedInException(Throwable t) {
if (t == null)
return false;
else if (t instanceof UserException)
return true;
else
return isUserExceptionWrappedInException(t.getCause());
}
}
ErrorQueueHandler:
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
@Stateless
public class ErrorQueueHandler{
public void sendToErrorQueue(Message message){
.
.
}
}
полезный ресурс: http://weblogic-wonders.com/weblogic/2011/01/10/working-with-jms-and-the-standard-issues-in-jms/