В теме Azure ServiceBus молча не удается доставить подписку
У меня очень неприятная проблема с использованием экземпляра Azure Service Bus. В частности, у меня есть тема, настроенная под названием "команды", которая через некоторое время прекратит корректное проксирование сообщений в свои подписки. Я наблюдал следующее поведение, которое оставило меня в моем нынешнем озадаченном состоянии:
Это произойдет только после того, как тема существует какое-то время, как правило, в одночасье. Все будет работать вечером, но когда я вернусь утром, тема перестала функционировать, и кажется, что единственное решение - удалить и воссоздать ее.
Как показано на рисунке ниже, тема, похоже, получает сообщения, так как я не получаю никаких исключений после отправки, а свойства "Количество активных сообщений" и "Размер в байтах" в теме увеличиваются. Он просто не отправляет сообщение в подписку:
Я подумал, что, возможно, фильтры в моих подписках испортили ситуацию, поэтому, чтобы проверить это, я удалил эти подписки и создал новую через проводник с фильтром по умолчанию. После этого я отправил несколько новых сообщений через тему, но они все еще не были получены новой подпиской.
У меня есть другие темы, работающие на служебной шине (например, "события" на изображении выше), но они, похоже, не демонстрируют такое же поведение. Они настроены идентичным образом, но работают просто отлично.
Я открыт для любых теорий о том, что может быть причиной такого странного поведения. Я рад предоставить дополнительную информацию, если это поможет решить эту проблему.
Блоки кода:
Создать тему:
private async Task<bool> CreateTopicAsync(NamespaceManager namespaceManager, string topicName, CancellationToken cancel, TimeSpan maxWaitTime)
{
var retVal = false;
var maxTimeToCreateTopic = DateTime.UtcNow + maxWaitTime;
while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateTopic)
{
try
{
await namespaceManager.CreateTopicAsync(new TopicDescription(topicName)
{
EnableBatchedOperations = true,
EnableFilteringMessagesBeforePublishing = true
});
retVal = true;
break;
}
catch (Exception ex)
{
LogError("Exception thrown when creating topic: {0}", ex);
}
if (!retVal)
{
LogWarning("Topic still does not exist, pausing and then retrying creation.");
await Task.Delay(_delayMs);
}
}
return retVal;
}
Создать подписку:
private async Task<bool> CreateSubscriptionAsync(NamespaceManager namespaceManager, string topicName, string subscriptionName, string filter, CancellationToken cancel, TimeSpan maxWaitTime)
{
var retVal = false;
var maxTimeToCreateSubscription = DateTime.UtcNow + maxWaitTime;
while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateSubscription)
{
try
{
if (string.IsNullOrEmpty(filter))
{
namespaceManager.CreateSubscription(topicName, subscriptionName);
}
else
{
namespaceManager.CreateSubscription(topicName, subscriptionName, new SqlFilter(filter));
}
retVal = true;
break;
}
catch (Exception ex)
{
LogError("Exception thrown when creating subscription: {0}", ex);
}
LogWarning("Subscription still does not exist, pausing and then retrying creation.");
await Task.Delay(_delayMs);
}
return retVal;
}
Отправить сообщения в тему:
BrokeredMessage brokeredMessage = null;
try
{
var type = nextMessage.GetType().AssemblyQualifiedName;
var jsonString = JsonConvert.SerializeObject(nextMessage);
var jsonStream = jsonString.ToStream();
brokeredMessage = new BrokeredMessage(jsonStream, true);
brokeredMessage.Properties["__messageType__"] = type;
if (nextData.Properties != null && nextData.Properties.Count > 0)
{
foreach (var prop in nextData.Properties)
{
brokeredMessage.Properties.Add(prop.Key, prop.Value);
}
}
}
catch (Exception ex)
{
LogError("Exception thrown when creating brokered message: {0}", ex);
brokeredMessage = null;
}
if (brokeredMessage != null)
{
var messageSentSuccessfully = false;
try
{
await client.SendAsync(brokeredMessage);
numConsecutiveFailures = 0;
messageSentSuccessfully = true;
}
catch (Exception ex)
{
numConsecutiveFailures++;
LogError("Exception thrown from SendAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
await Task.Delay(_delayMs);
}
}
Переданный в теме клиент просто создается с помощью метода TopicClient.CreateFromConnectionString.
Получать сообщения от подписки:
private async Task ReceiveLoopAsync(SubscriptionClient client, CancellationToken cancel, TimeSpan maxReceiveWaitTime)
{
var numConsecutiveFailures = 0;
var maxConsecutiveFailures = 5;
while (!cancel.IsCancellationRequested && numConsecutiveFailures < maxConsecutiveFailures)
{
BrokeredMessage newMsg = null;
try
{
newMsg = await client.ReceiveAsync(maxReceiveWaitTime);
numConsecutiveFailures = 0;
}
catch (Exception ex)
{
numConsecutiveFailures++;
LogError("Exception thrown from ReceiveAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
await Task.Delay(_delayMs);
}
// newMsg will be null if there were no messages to process after the allotted timeout expired.
if (newMsg != null)
{
// Just a function call.
_onMessageReceived?.Invoke(newMsg);
}
//LogDebug("Bottom of Receive");
}
//LogDebug("Exit Receive");
}
Переданный в подписке клиент просто создается с помощью метода SubscriptionClient.CreateFromConnectionString.