Служебная шина - получение сообщения из сеанса по порядковому номеру
В настоящее время я пытаюсь получить конкретное сообщение из сеанса.
Для этого я использую хочу использовать. Получите (Int64) на MessageSession, где я передаю порядковый номер сообщения.
Вот мой код -
long msgSequenceNr = 1337;
QueueClient queueClient = QueueClient.CreateFromConnectionString(Constants.ServiceBusConnectionString, Constants.TestQueueEntityName, ReceiveMode.PeekLock);
MessageSession msgSession = queueClient.AcceptMessageSession(Constants.TestSessionId);
var peekedMsg = msgSession.Peek(msgSequenceNr); // <-- Works fine!
var receivedMsg = msgSession.Receive(msgSequenceNr); // <-- MessageNotFoundException
К сожалению, получение приведет к MessageNotFoundException, в то время как Peek работает нормально. Это ограничение, которое я пропустил, или есть другой способ добиться этого.
Обратите внимание, что в сеансе может быть несколько сообщений.
1 ответ
Прием с SequenceNumber может использоваться только в сочетании с методом Defer. Вот как вы бы это реализовали:
- Сообщение получено, но оно не может быть обработано прямо сейчас (возможно, оно ожидает завершения другого процесса).
- Сохранить номер последовательности в каком-то постоянном хранилище (хранилище таблиц, база данных SQL, ...)
- Когда вы знаете, что обработка может продолжаться (например, зависимый процесс завершен), загрузите все порядковые номера из постоянного хранилища.
- Используйте Receive(int sequenceNumber) или ReceiveBatch(int[] sequenceNumbers) для получения и обработки ваших отложенных сообщений.
Пример приложения: https://code.msdn.microsoft.com/windowsazure/Brokered-Messaging-ccc4f879
Обновить:
Форма вашего комментария Я заметил, что "отстранение" отложенного сообщения может быть решением. Вот некоторый пример кода для открепления сообщения, которое копирует отложенное сообщение в новое сообщение, завершает отложенное сообщение и отправляет новое сообщение обратно в очередь. Это использует TransactionScope для транзакционного завершения и повторной отправки сообщения, чтобы избежать риска потери сообщения:
var messageId = "12434539828282";
// Send.
var msg = new BrokeredMessage {SessionId = "user1", MessageId = messageId };
msg.Properties.Add("Language", "Dutch");
queue.Send(msg);
// Receive.
var session = queue.AcceptMessageSession();
msg = session.Receive();
// Store the sequence number.
var sequenceNumber = msg.SequenceNumber;
// Defer.
msg.Defer();
// Change to true to test if the transaction worked.
var shouldThrow = false;
// Later processing of deferred message.
msg = session.Receive(sequenceNumber);
try
{
using (var ts = new TransactionScope())
{
// Create a new message.
var undeferredMessage = new BrokeredMessage {SessionId = msg.SessionId, MessageId = msg.MessageId};
foreach (var prop in msg.Properties)
undeferredMessage.Properties.Add(prop);
// Complete and send within the same transaction.
msg.Complete();
if (shouldThrow)
throw new InvalidOperationException("Some error");
queue.Send(undeferredMessage);
// Complete the transaction.
ts.Complete();
}
}
catch (Exception ex)
{
msg.Abandon();
}
if (shouldThrow)
{
msg = session.Receive(sequenceNumber);
Console.WriteLine(msg.MessageId + " should match: " + messageId);
}
else
{
try
{
msg = session.Receive(sequenceNumber);
}
catch (Exception ex)
{
Console.WriteLine("Message not found, transaction worked OK.");
}
}
Примечание: здесь я просто беру копию свойств. Имейте в виду, что вы можете скопировать тело и любую другую дополнительную информацию.