Количество доставок сообщений служебной шины Azure не увеличивается или сбрасывается при отключении / включении подписки на тему
У меня есть следующий рабочий процесс:
- Сервисная шина принимает сообщения.
- Функция Azure запускает и пытается доставить эти сообщения через HTTP какой-либо службе.
- Если доставка не удалась - функция выдает исключение (пользовательское) и отключает подписку на тему с помощью кода ниже:
- Другая функция параллельно отправляет эхо-запрос специальной конечной точке проверки работоспособности службы, и, если она получает 200 - она пытается включить подписку и снова запустить поток.
- Шаги могут быть воспроизведены N раз, потому что проверка работоспособности вернет 200, таким образом, адрес доставки кода пункта 2 - 4xx.
После следующей попытки включить подписку и доставить сообщение, я ожидаю, что количество доставок будет увеличено, и в конце (после 10 попыток доставки) оно попадет в тупик. Фактически - это равно 1.
Я предполагаю, что он может сброситься, когда я вызываю CreateOrUpdate с измененным статусом. Если да - каким другим способом можно управлять статусом подписки вместо пакета Microsoft.Azure.Management, чтобы счетчик доставки сообщений не сбрасывался?
ОБНОВЛЕНИЕ: Код функции
public static class ESBTESTSubscriptionTrigger
{
private static readonly HttpClient Client = new HttpClient();
private static IDatabase redisCache;
[FunctionName("ESBTESTSubscriptionTrigger")]
[Singleton]
public static async Task Run([ServiceBusTrigger("Notifications", "ESBTEST", AccessRights.Listen, Connection = "NotificationsBusConnectionString")]BrokeredMessage serviceBusMessage, TraceWriter log, [Inject]IKeyVaultSecretsManager keyVaultSecretsManager)
{
var logicAppUrl = await keyVaultSecretsManager.GetSecretAsync("NotificationsLogicAppUrl");
if (redisCache == null)
{
redisCache = RedisCacheConnectionManager.GetRedisCacheConnection(
keyVaultSecretsManager.GetSecretAsync("RedisCacheConnectionString").GetAwaiter().GetResult());
}
if (string.IsNullOrWhiteSpace(logicAppUrl))
{
log.Error("Logic App URL should be provided in Application settings of function App.");
throw new ParameterIsMissingException("Logic App URL should be provided in Application settings of function App.");
}
var applicaitonId = serviceBusMessage.Properties["applicationId"].ToString();
var eventName = serviceBusMessage.Properties.ContainsKey("Event-Name") ? serviceBusMessage.Properties["Event-Name"].ToString() : string.Empty;
if (string.IsNullOrWhiteSpace(applicaitonId))
{
log.Error("ApplicationId should be present in service bus message properties.");
throw new ParameterIsMissingException("Application id is missing in service bus message.");
}
Stream stream = serviceBusMessage.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string s = reader.ReadToEnd();
var content = new StringContent(s, Encoding.UTF8, "application/json");
content.Headers.Add("ApplicationId", applicaitonId);
HttpResponseMessage response;
try
{
response = await Client.PostAsync(logicAppUrl, content);
}
catch (HttpRequestException e)
{
log.Error($"Logic App responded with {e.Message}");
throw new LogicAppBadRequestException($"Logic App responded with {e.Message}", e);
}
if (!response.IsSuccessStatusCode)
{
log.Error($"Logic App responded with {response.StatusCode}");
var serviceBusSubscriptionsSwitcherUrl = await keyVaultSecretsManager.GetSecretAsync("ServiceBusTopicSubscriptionSwitcherUri");
var sbSubscriptionSwitcherResponse = await Client.SendAsync(
new HttpRequestMessage(HttpMethod.Post, serviceBusSubscriptionsSwitcherUrl)
{
Content =
new
StringContent(
$"{{\"Action\":\"Disable\",\"SubscriptionName\":\"{applicaitonId}\"}}",
Encoding.UTF8,
"application/json")
});
if (sbSubscriptionSwitcherResponse.IsSuccessStatusCode == false)
{
throw new FunctionNotAvailableException($"ServiceBusTopicSubscriptionSwitcher responded with {sbSubscriptionSwitcherResponse.StatusCode}");
}
throw new LogicAppBadRequestException($"Logic App responded with {response.StatusCode}");
}
if (!string.IsNullOrWhiteSpace(eventName))
{
redisCache.KeyDelete($"{applicaitonId}{eventName}DeliveryErrorEmailSent");
}
}
}