Количество доставок сообщений служебной шины Azure не увеличивается или сбрасывается при отключении / включении подписки на тему

У меня есть следующий рабочий процесс:

  1. Сервисная шина принимает сообщения.
  2. Функция Azure запускает и пытается доставить эти сообщения через HTTP какой-либо службе.
  3. Если доставка не удалась - функция выдает исключение (пользовательское) и отключает подписку на тему с помощью кода ниже:

введите описание изображения здесь

  1. Другая функция параллельно отправляет эхо-запрос специальной конечной точке проверки работоспособности службы, и, если она получает 200 - она ​​пытается включить подписку и снова запустить поток.
  2. Шаги могут быть воспроизведены N раз, потому что проверка работоспособности вернет 200, таким образом, адрес доставки кода пункта 2 - 4xx.

После следующей попытки включить подписку и доставить сообщение, я ожидаю, что количество доставок будет увеличено, и в конце (после 10 попыток доставки) оно попадет в тупик. Фактически - это равно 1.

введите описание изображения здесь

Я предполагаю, что он может сброситься, когда я вызываю CreateOrUpdate с измененным статусом. Если да - каким другим способом можно управлять статусом подписки вместо пакета Microsoft.Azure.Management, чтобы счетчик доставки сообщений не сбрасывался?

ОБНОВЛЕНИЕ: Код функции

public static class ESBTESTSubscriptionTrigger
{
    private static readonly HttpClient Client = new HttpClient();

    private static IDatabase redisCache;

    [FunctionName("ESBTESTSubscriptionTrigger")]
    [Singleton]
    public static async Task Run([ServiceBusTrigger("Notifications", "ESBTEST", AccessRights.Listen, Connection = "NotificationsBusConnectionString")]BrokeredMessage serviceBusMessage, TraceWriter log, [Inject]IKeyVaultSecretsManager keyVaultSecretsManager)
    {
        var logicAppUrl = await keyVaultSecretsManager.GetSecretAsync("NotificationsLogicAppUrl");

        if (redisCache == null)
        {
            redisCache = RedisCacheConnectionManager.GetRedisCacheConnection(
                keyVaultSecretsManager.GetSecretAsync("RedisCacheConnectionString").GetAwaiter().GetResult());
        }

        if (string.IsNullOrWhiteSpace(logicAppUrl))
        {
            log.Error("Logic App URL should be provided in Application settings of function App.");
            throw new ParameterIsMissingException("Logic App URL should be provided in Application settings of function App.");
        }

        var applicaitonId = serviceBusMessage.Properties["applicationId"].ToString();
        var eventName = serviceBusMessage.Properties.ContainsKey("Event-Name") ? serviceBusMessage.Properties["Event-Name"].ToString() : string.Empty;
        if (string.IsNullOrWhiteSpace(applicaitonId))
        {
            log.Error("ApplicationId should be present in service bus message properties.");
            throw new ParameterIsMissingException("Application id is missing in service bus message.");
        }

        Stream stream = serviceBusMessage.GetBody<Stream>();
        StreamReader reader = new StreamReader(stream);
        string s = reader.ReadToEnd();

        var content = new StringContent(s, Encoding.UTF8, "application/json");
        content.Headers.Add("ApplicationId", applicaitonId);

        HttpResponseMessage response;
        try
        {
            response = await Client.PostAsync(logicAppUrl, content);
        }
        catch (HttpRequestException e)
        {
            log.Error($"Logic App responded with {e.Message}");
            throw new LogicAppBadRequestException($"Logic App responded with {e.Message}", e);
        }

        if (!response.IsSuccessStatusCode)
        {
            log.Error($"Logic App responded with {response.StatusCode}");

            var serviceBusSubscriptionsSwitcherUrl = await keyVaultSecretsManager.GetSecretAsync("ServiceBusTopicSubscriptionSwitcherUri");
            var sbSubscriptionSwitcherResponse = await Client.SendAsync(
                                                     new HttpRequestMessage(HttpMethod.Post, serviceBusSubscriptionsSwitcherUrl)
                                                         {
                                                             Content =
                                                                 new
                                                                     StringContent(
                                                                         $"{{\"Action\":\"Disable\",\"SubscriptionName\":\"{applicaitonId}\"}}",
                                                                         Encoding.UTF8,
                                                                         "application/json")
                                                         });

            if (sbSubscriptionSwitcherResponse.IsSuccessStatusCode == false)
            {
                throw new FunctionNotAvailableException($"ServiceBusTopicSubscriptionSwitcher responded with {sbSubscriptionSwitcherResponse.StatusCode}");
            }

            throw new LogicAppBadRequestException($"Logic App responded with {response.StatusCode}");
        }

        if (!string.IsNullOrWhiteSpace(eventName))
        {
            redisCache.KeyDelete($"{applicaitonId}{eventName}DeliveryErrorEmailSent");
        }
    }
}

0 ответов

Другие вопросы по тегам