Служебная шина - получение сообщения из сеанса по порядковому номеру

В настоящее время я пытаюсь получить конкретное сообщение из сеанса.

Для этого я использую хочу использовать. Получите (Int64) на MessageSession, где я передаю порядковый номер сообщения.

Вот мой код -

long msgSequenceNr = 1337;

QueueClient queueClient = QueueClient.CreateFromConnectionString(Constants.ServiceBusConnectionString, Constants.TestQueueEntityName, ReceiveMode.PeekLock);
MessageSession msgSession = queueClient.AcceptMessageSession(Constants.TestSessionId);

var peekedMsg = msgSession.Peek(msgSequenceNr); // <-- Works fine!
var receivedMsg = msgSession.Receive(msgSequenceNr); // <-- MessageNotFoundException

К сожалению, получение приведет к MessageNotFoundException, в то время как Peek работает нормально. Это ограничение, которое я пропустил, или есть другой способ добиться этого.

Обратите внимание, что в сеансе может быть несколько сообщений.

1 ответ

Решение

Прием с SequenceNumber может использоваться только в сочетании с методом Defer. Вот как вы бы это реализовали:

  1. Сообщение получено, но оно не может быть обработано прямо сейчас (возможно, оно ожидает завершения другого процесса).
  2. Сохранить номер последовательности в каком-то постоянном хранилище (хранилище таблиц, база данных SQL, ...)
  3. Когда вы знаете, что обработка может продолжаться (например, зависимый процесс завершен), загрузите все порядковые номера из постоянного хранилища.
  4. Используйте Receive(int sequenceNumber) или ReceiveBatch(int[] sequenceNumbers) для получения и обработки ваших отложенных сообщений.

Пример приложения: https://code.msdn.microsoft.com/windowsazure/Brokered-Messaging-ccc4f879

Обновить:

Форма вашего комментария Я заметил, что "отстранение" отложенного сообщения может быть решением. Вот некоторый пример кода для открепления сообщения, которое копирует отложенное сообщение в новое сообщение, завершает отложенное сообщение и отправляет новое сообщение обратно в очередь. Это использует TransactionScope для транзакционного завершения и повторной отправки сообщения, чтобы избежать риска потери сообщения:

    var messageId = "12434539828282";

    // Send.
    var msg = new BrokeredMessage {SessionId = "user1", MessageId = messageId };
    msg.Properties.Add("Language", "Dutch");
    queue.Send(msg);

    // Receive.
    var session = queue.AcceptMessageSession();
    msg = session.Receive();

    // Store the sequence number.
    var sequenceNumber = msg.SequenceNumber;

    // Defer.
    msg.Defer();

    // Change to true to test if the transaction worked.
    var shouldThrow = false;

    // Later processing of deferred message.
    msg = session.Receive(sequenceNumber);

    try
    {
        using (var ts = new TransactionScope())
        {
            // Create a new message.
            var undeferredMessage = new BrokeredMessage {SessionId = msg.SessionId, MessageId = msg.MessageId};
            foreach (var prop in msg.Properties)
                undeferredMessage.Properties.Add(prop);

            // Complete and send within the same transaction.
            msg.Complete();
            if (shouldThrow)
                throw new InvalidOperationException("Some error");
            queue.Send(undeferredMessage);

            // Complete the transaction.
            ts.Complete();
        }
    }
    catch (Exception ex)
    {
        msg.Abandon();
    }

    if (shouldThrow)
    {
        msg = session.Receive(sequenceNumber);
        Console.WriteLine(msg.MessageId + " should match: " + messageId);
    }
    else
    {
        try
        {
            msg = session.Receive(sequenceNumber);
        }
        catch (Exception ex)
        {
            Console.WriteLine("Message not found, transaction worked OK.");
        }
    }

Примечание: здесь я просто беру копию свойств. Имейте в виду, что вы можете скопировать тело и любую другую дополнительную информацию.

Другие вопросы по тегам