Azure ServiceBus OnMessage блокирует вызов или нет?

Мы используем очереди Azure ServiceBus для обработки больших объемов клиентских запросов. Тем не менее OnMessage вызов кажется блокирующим, однако блокировка этого вызова невероятно непоследовательна, если это действительно блокирующий вызов.

Я пытаюсь выполнить постоянное наблюдение за очередью из приложения веб-службы (чтобы можно было добывать метрики из запущенного приложения)

Я создаю подписку ниже:

protected virtual void Subscribe(string queueName, Func<QueueRequest, bool> callback)
{
    var client = GetClient(queueName, PollingTimeout);
    var transformCallback = new Action<BrokeredMessage>((message) =>
    {
        try
        {
            var request = message.ToQueueRequest();
            if (callback(request))
            {
                message.Complete();
            }
            else
            {
                message.Abandon();
            }
        }
        catch (Exception ex)
        {
            //TODO: Log the error
            message.Abandon();
        }
    });
    var options = new OnMessageOptions
    {
        MaxConcurrentCalls = _config.GetInt("MaxThreadsPerQueue"),
        AutoComplete = false
    };
    options.ExceptionReceived += OnMessageError;
    client.OnMessage(transformCallback, options);
}

Если у меня подписка вызывается только один раз, приложение перестает наблюдать за очередью и, таким образом, останавливает обработку сообщений. Однако, если я размещу цикл while вокруг моего звонка по подписке. Поэтому с большой неохотой я написал приведенный ниже фрагмент, чтобы переподписаться, если это OnMessage завершено.

protected void MonitorQueue()
{
    IsRunning = true;
    while (IsRunning)
    {
        try
        {
            Log.Info("MonitoringThread: OnMessage beginning logging for {0}", QueueName);
            QueueClient.Subscribe(QueueName, Processor);
            Log.Info("MonitoringThread:  OnMessage ended logging for {0}", QueueName);
        }
        catch (Exception ex)
        {
            IsRunning = false;
            Log.Error("MonitoringThread: Error in subscription for {0}: ", ex, QueueName);
        }

        if (SleepBeforeReinit > 0 && IsRunning)
        {
            Thread.Sleep(SleepBeforeReinit);
        }
    }
}

Это устранило проблему сообщений, истекающих до мертвой буквы из-за того, что они не были подобраны, однако это вызвало другие проблемы.

Поскольку OnMessage является оплачиваемой операцией, меня беспокоит, когда я вижу, что файл журнала сообщает мне, что очередь начинается и заканчивается с интервалом менее секунды, а размер моего файла журнала очень быстро увеличивается.

У меня есть мой MessagingFactory установить иметь OperationTimeout 1 дня, но, похоже, это не влияет на частоту открытия / закрытия подписок, как я и ожидал.

Я видел множество примеров, выполняющих эту роль в качестве рабочей роли, однако это не даст того, что мы пытаемся сделать. В настоящее время я связываю это с Global.asax.cs нашего веб-приложения. Любые советы высоко ценится!

2 ответа

Решение

OnMessage и OnMessageAsync НЕ блокируют вызовы. Их необходимо создать один раз и продолжать подписку на очередь до тех пор, пока приложение не будет закрыто.

Дополнительные сведения см. В соответствующей публикации: служебная шина Azure, определите, останавливает ли OnMessage обработку

Ваш первый подход верен, и значение MaxThreadsPerQueue определяет, сколько потоков доступно для обработки ваших сообщений. Звучит так, как будто эти темы израсходованы (может быть, обработка ваших сообщений обратным вызовом занимает некоторое время?).

Вместо этого вам следует рассмотреть возможность использования метода QueueClient.OnMessageAsync для получения ваших сообщений.

Другие вопросы по тегам