ServiceBus over AMQP закрывает потребителя
У нас есть локальная установка ServiceBus и Java JMS QPID-клиента 0.26. Кажется, в SB есть ошибка - он отправляет команду END потребителю, когда производитель закрывает соединение. И производитель, и потребитель должны работать на одной машине.
Первый стартовый потребитель, который прослушивает очередь:
static void consumeFromQueueForStackru(Connection connection, Context context)
throws JMSException, NamingException, InterruptedException {
Session session = null;
MessageConsumer consumer = null;
long RECEIVE_TIMEOUT_MS = 30000;
try {
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("JBA_QUEUE");
consumer = session.createConsumer(queue);
connection.start();
int consumed = 0;
while (true) {
long startMS = System.currentTimeMillis();
Message message = consumer.receive(RECEIVE_TIMEOUT_MS);
if (message != null) {
consumed++;
message.acknowledge();
continue;
}
long durationMS = System.currentTimeMillis() - startMS;
if (durationMS < RECEIVE_TIMEOUT_MS) {
log.info(String.format(
"Connection is closed, timeout: %d[ms], waited: %s[ms] (consumed: %d)",
RECEIVE_TIMEOUT_MS, durationMS, consumed));
break;
}
log.info(String.format("Receive timeout, retrying (consumed: %d)", consumed));
consumed = 0;
}
} finally {
connection.stop();
if(null != consumer)
consumer.close();
if(null != session)
session.close();
}
}
Затем отправьте 1 сообщение в очередь:
static void publishToQueueForStackru(Connection connection, Context context)
throws JMSException, NamingException, InterruptedException {
Session session = null;
MessageProducer producer = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup("JBA_QUEUE");
producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("My Message");
producer.send(message);
} finally {
if(producer != null)
producer.close();
if(session != null)
session.close();
}
log.info(String.format("Sent %d messages", count));
}
Как только процесс производителя завершится, потребитель также завершит работу с временем ожидания, меньшим, чем время ожидания. Метод receive(timeout) возвращает значение null до истечения срока ожидания (javadoc), "потребитель сообщения одновременно закрыт". В качестве альтернативы вы можете увидеть IllegalStateException: Закрыто во время вызова для подтверждения ().
Вот журнал от потребителя. Вы можете видеть, что сервер отправил неожиданный конец {} клиенту:
RECV[5671|0] : Open{containerId=087d0b7b8a8e4809a686f8b20d5376f5_GPRGXIT002,maxFrameSize=65536,channelMax=255,idleTimeOut=240000}
SEND[5671|0] : null
SEND[5671|0] : Begin{nextOutgoingId=0,incomingWindow=2048,outgoingWindow=2048,handleMax=4294967295}
SEND[5671|0] : Attach{name=IntegrationServiceBus/jba_testing_queue-> (48ff030e-c7be-42b2-9c22-4d0db13aec8f),handle=0,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=IntegrationServiceBus/jba_testing_queue,durable=none,expiryPolicy=link-detach},target=Target{}}
RECV[5671|0] : Begin{remoteChannel=0,nextOutgoingId=1,incomingWindow=2048,outgoingWindow=2048,handleMax=7}
RECV[5671|0] : Attach{name=IntegrationServiceBus/jba_testing_queue-> (48ff030e-c7be-42b2-9c22-4d0db13aec8f),handle=0,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=IntegrationServiceBus/jba_testing_queue,durable=none,expiryPolicy=link-detach},target=Target{},initialDeliveryCount=0,maxMessageSize=262144,properties={com.microsoft:tracking-id=087d0b7b8a8e4809a686f8b20d5376f5_GPRGXIT002_BPRGXIT003;2411:54:55}}
SEND[5671|0] : Flow{nextIncomingId=1,incomingWindow=2048,nextOutgoingId=0,outgoingWindow=2048,handle=0,deliveryCount=0,linkCredit=100,drain=false,echo=false}
RECV[5671|0] : Transfer{handle=0,deliveryId=0,deliveryTag=\x84\xb8.\xf5\xda3\xafF\x89<J\x1bj\xda{<,messageFormat=0,more=false,batchable=true}
RECV[5671|0] : End{}
SEND[5671|0] : Detach{handle=0}
SEND[5671|0] : Disposition{role=receiver,first=0,last=0,settled=true,state=Released{}}
SEND[5671|0] : End{}
SEND[5671|0] : End{}
SEND[5671|0] : Close{}
RECV[5671|0] : Close{}
1 ответ
Это ошибка брокера. Обходным решением является использование sync_publish='all', подробнее см. ServiceBus over AMQP/QPID. Клиент закрывает потребителя.