В теме Azure ServiceBus молча не удается доставить подписку

У меня очень неприятная проблема с использованием экземпляра Azure Service Bus. В частности, у меня есть тема, настроенная под названием "команды", которая через некоторое время прекратит корректное проксирование сообщений в свои подписки. Я наблюдал следующее поведение, которое оставило меня в моем нынешнем озадаченном состоянии:

  1. Это произойдет только после того, как тема существует какое-то время, как правило, в одночасье. Все будет работать вечером, но когда я вернусь утром, тема перестала функционировать, и кажется, что единственное решение - удалить и воссоздать ее.

  2. Как показано на рисунке ниже, тема, похоже, получает сообщения, так как я не получаю никаких исключений после отправки, а свойства "Количество активных сообщений" и "Размер в байтах" в теме увеличиваются. Он просто не отправляет сообщение в подписку: введите описание изображения здесь

  3. Я подумал, что, возможно, фильтры в моих подписках испортили ситуацию, поэтому, чтобы проверить это, я удалил эти подписки и создал новую через проводник с фильтром по умолчанию. После этого я отправил несколько новых сообщений через тему, но они все еще не были получены новой подпиской.

  4. У меня есть другие темы, работающие на служебной шине (например, "события" на изображении выше), но они, похоже, не демонстрируют такое же поведение. Они настроены идентичным образом, но работают просто отлично.

Я открыт для любых теорий о том, что может быть причиной такого странного поведения. Я рад предоставить дополнительную информацию, если это поможет решить эту проблему.

Блоки кода:

Создать тему:

private async Task<bool> CreateTopicAsync(NamespaceManager namespaceManager, string topicName, CancellationToken cancel, TimeSpan maxWaitTime)
        {
            var retVal = false;
            var maxTimeToCreateTopic = DateTime.UtcNow + maxWaitTime;

            while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateTopic)
            {
                try
                {
                    await namespaceManager.CreateTopicAsync(new TopicDescription(topicName)
                    {
                        EnableBatchedOperations = true,
                        EnableFilteringMessagesBeforePublishing = true
                    });
                    retVal = true;
                    break;
                }
                catch (Exception ex)
                {
                    LogError("Exception thrown when creating topic: {0}", ex);
                }

                if (!retVal)
                {
                    LogWarning("Topic still does not exist, pausing and then retrying creation.");
                    await Task.Delay(_delayMs);
                }
            }

            return retVal;
        }

Создать подписку:

private async Task<bool> CreateSubscriptionAsync(NamespaceManager namespaceManager, string topicName, string subscriptionName, string filter, CancellationToken cancel, TimeSpan maxWaitTime)
        {
            var retVal = false;
            var maxTimeToCreateSubscription = DateTime.UtcNow + maxWaitTime;

            while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateSubscription)
            {
                try
                {
                    if (string.IsNullOrEmpty(filter))
                    {
                        namespaceManager.CreateSubscription(topicName, subscriptionName);
                    }
                    else
                    {
                        namespaceManager.CreateSubscription(topicName, subscriptionName, new SqlFilter(filter));
                    }
                    retVal = true;
                    break;
                }
                catch (Exception ex)
                {
                    LogError("Exception thrown when creating subscription: {0}", ex);
                }

                LogWarning("Subscription still does not exist, pausing and then retrying creation.");
                await Task.Delay(_delayMs);
            }

            return retVal;
        }

Отправить сообщения в тему:

BrokeredMessage brokeredMessage = null;
                    try
                    {
                        var type = nextMessage.GetType().AssemblyQualifiedName;
                        var jsonString = JsonConvert.SerializeObject(nextMessage);
                        var jsonStream = jsonString.ToStream();

                        brokeredMessage = new BrokeredMessage(jsonStream, true);
                        brokeredMessage.Properties["__messageType__"] = type;
                        if (nextData.Properties != null && nextData.Properties.Count > 0)
                        {
                            foreach (var prop in nextData.Properties)
                            {
                                brokeredMessage.Properties.Add(prop.Key, prop.Value);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        LogError("Exception thrown when creating brokered message: {0}", ex);

                        brokeredMessage = null;
                    }

                    if (brokeredMessage != null)
                    {
                        var messageSentSuccessfully = false;
                        try
                        {
                            await client.SendAsync(brokeredMessage);
                            numConsecutiveFailures = 0;
                            messageSentSuccessfully = true;
                        }
                        catch (Exception ex)
                        {
                            numConsecutiveFailures++;
                            LogError("Exception thrown from SendAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
                            await Task.Delay(_delayMs);
                        }
                    }

Переданный в теме клиент просто создается с помощью метода TopicClient.CreateFromConnectionString.

Получать сообщения от подписки:

private async Task ReceiveLoopAsync(SubscriptionClient client, CancellationToken cancel, TimeSpan maxReceiveWaitTime)
        {
            var numConsecutiveFailures = 0;
            var maxConsecutiveFailures = 5;

            while (!cancel.IsCancellationRequested && numConsecutiveFailures < maxConsecutiveFailures)
            {
                BrokeredMessage newMsg = null;

                try
                {
                    newMsg = await client.ReceiveAsync(maxReceiveWaitTime);
                    numConsecutiveFailures = 0;
                }
                catch (Exception ex)
                {
                    numConsecutiveFailures++;
                    LogError("Exception thrown from ReceiveAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
                    await Task.Delay(_delayMs);
                }

                // newMsg will be null if there were no messages to process after the allotted timeout expired.
                if (newMsg != null)
                {
                    // Just a function call.
                    _onMessageReceived?.Invoke(newMsg);
                }

                //LogDebug("Bottom of Receive");
            }

            //LogDebug("Exit Receive");
        }

Переданный в подписке клиент просто создается с помощью метода SubscriptionClient.CreateFromConnectionString.

0 ответов

Другие вопросы по тегам