.NET асинхронный MSMQ

Я не понимаю, где это идет не так. По сути, у меня есть программа, которая получает из очереди сообщений и обрабатывает сообщения. Программа может быть остановлена ​​в любое время, и в этом случае цикл обработки сообщений завершает свою работу до выхода из программы. Я пытаюсь сделать это с помощью следующего кода:

private MessageQueue q;
private ManualResetEventSlim idle;

public void Start()
{
    idle = new ManualResetEventSlim();
    q.ReceiveCompleted += this.MessageQueue_ReceiveCompleted;    
    q.BeginReceive();
}    

public void Stop()
{ 
    this.q.Dispose();
    this.idle.Wait();    
}

private void MessageQueue_ReceiveCompleted(object sender, 
    ReceiveCompletedEventArgs e)
{
    Message inMsg;
    try
    {
        inMsg = e.Message;
    }
    catch (Exception ex)
    {
        this.idle.Set();
        return;
    }

    // Handle message

    this.q.BeginReceive();
}

Как мы надеемся, метод Stop удаляет очередь сообщений, а затем ожидает установки дескриптора ожидания ожидания (что должно произойти, когда при получении будет вызвано событие ReceiveCompleted, но свойство e.Message должно быть исключением).

Тем не менее, цикл сообщений просто продолжается! Я удалил очередь сообщений, но ей все равно удается прочитать из нее, и обработчик исключений не вызывается, то есть строка idle.Wait ждет вечно.

Насколько я понимаю, удаление очереди сообщений ДОЛЖНО завершать любое ожидающее получение и вызывать событие, но e.Message (или q.EndReceive) должно вызвать исключение. Разве это не так? Если нет, как еще можно безопасно выйти из цикла сообщений?

Спасибо

ОБНОВИТЬ:

Вот полный пример (предполагается, что очередь существует)

class Program
{
    static MessageQueue mq;
    static ManualResetEventSlim idleWH;

    static void Main(string[] args)
    {
        idleWH = new ManualResetEventSlim();

        Console.WriteLine("Opening...");
        using (mq = new MessageQueue(@".\private$\test"))
        {
            mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(int) });
            mq.ReceiveCompleted += mq_ReceiveCompleted;

            for (int i = 0; i < 10000; ++i)
                mq.Send(i);

            Console.WriteLine("Begin Receive...");
            mq.BeginReceive();

            Console.WriteLine("Press ENTER to exit loop");
            Console.ReadLine();

            Console.WriteLine("Closing...");

            mq.Close();
        }

        Console.WriteLine("Waiting...");
        idleWH.Wait();

        Console.WriteLine("Press ENTER (ex)");
        //Console.ReadLine();
    }

    static void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        try
        {
            var msg = mq.EndReceive(e.AsyncResult);
            Console.Title = msg.Body.ToString();

            // Receive next message
            mq.BeginReceive();
        }
        catch (Exception ex)
        {
            idleWH.Set();
            return;
        }
    }
}

3 ответа

Решение

Единственный способ, которым я мог получить эту работу, был с транзакционной очередью. Любая нетранзакционная очередь кажется уязвимой для этого. Не ответ, но лучший совет, который я могу дать любому, кто найдет это.

Не совсем уверен, как вы сделали эту работу вообще. Вы должны вызвать MessageQueue.EndReceive() в событии. Только этот метод может вызвать исключение. Просмотрите пример кода MSDN для события ReceiveCompleted. И не улавливайте Исключение, которое просто вызывает неопровержимый сбой. Поймать конкретное исключение, которое вы получаете, когда вы располагаете очередь, ObjectDisposedException.

    private static volatile bool _shouldStop = false;

,,,

            _shouldStop = true;
            mq.Close();

,,,

        try
        {
            var msg = mq.EndReceive(e.AsyncResult);

            if ( _shouldStop)
            {
                idleWH.Set();
                return;
            }

            mq.BeginReceive();
        }

,,,

Другие вопросы по тегам