Интеграционный тест для публикации в одной теме и подписки на другую в Azure Service Bus ненадежен. Существует ли условие гонки?
Я пытаюсь написать интеграционный / приемочный тест для тестирования некоторого кода на лазурном, код в вопросе ATM просто подписывается на одну тему и публикуется в другой.
Я написал тест, но он не всегда проходит, кажется, что может быть условие гонки. Я попытался написать его несколькими способами, включая использование OnMessage, а также использование Receive (пример, который я показываю здесь).
При использовании OnMessage тест всегда завершался преждевременно (около 30 секунд), что, я думаю, в любом случае означает, что он не подходит для этого теста.
Мой запрос, касающийся моего примера, в частности, я предполагал, что после создания подписки на целевую тему, что любое сообщение, отправленное на него, я смогу забрать с помощью Receive(), в любой момент времени это сообщение означает, если сообщение приходит в целевой теме, прежде чем я вызову Receive(), я все равно смогу прочитать сообщение позже, вызвав Receive(). Может ли кто-нибудь пролить свет на это?
namespace somenamespace {
[TestClass]
public class SampleTopicTest
{
private static TopicClient topicClient;
private static SubscriptionClient subClientKoEligible;
private static SubscriptionClient subClientKoIneligible;
private static OnMessageOptions options;
public const string TEST_MESSAGE_SUB = "TestMessageSub";
private static NamespaceManager namespaceManager;
private static string topicFleKoEligible;
private static string topicFleKoIneligible;
private BrokeredMessage message;
[ClassInitialize]
public static void BeforeClass(TestContext testContext)
{
//client for publishing messages
string connectionString = ConfigurationManager.AppSettings["ServiceBusConnectionString"];
string topicDataReady = ConfigurationManager.AppSettings["DataReadyTopicName"];
topicClient = TopicClient.CreateFromConnectionString(connectionString, topicDataReady);
topicFleKoEligible = ConfigurationManager.AppSettings["KnockOutEligibleTopicName"];
topicFleKoIneligible = ConfigurationManager.AppSettings["KnockOutIneligibleTopicName"];
//create test subscription to receive messages
namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.SubscriptionExists(topicFleKoEligible, TEST_MESSAGE_SUB))
{
namespaceManager.CreateSubscription(topicFleKoEligible, TEST_MESSAGE_SUB);
}
if (!namespaceManager.SubscriptionExists(topicFleKoIneligible, TEST_MESSAGE_SUB))
{
namespaceManager.CreateSubscription(topicFleKoIneligible, TEST_MESSAGE_SUB);
}
//subscriber client koeligible
subClientKoEligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoEligible, TEST_MESSAGE_SUB);
subClientKoIneligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoIneligible, TEST_MESSAGE_SUB);
options = new OnMessageOptions()
{
AutoComplete = false,
AutoRenewTimeout = TimeSpan.FromMinutes(1),
};
}
[TestMethod]
public void E2EPOCTopicTestLT50()
{
Random rnd = new Random();
string customerId = rnd.Next(1, 49).ToString();
FurtherLendingCustomer sentCustomer = new FurtherLendingCustomer { CustomerId = customerId };
BrokeredMessage sentMessage = new BrokeredMessage(sentCustomer.ToJson());
sentMessage.CorrelationId = Guid.NewGuid().ToString();
string messageId = sentMessage.MessageId;
topicClient.Send(sentMessage);
Boolean messageRead = false;
//wait for message to arrive on the ko eligible queue
while((message = subClientKoEligible.Receive(TimeSpan.FromMinutes(2))) != null){
//read message
string messageString = message.GetBody<String>();
//Serialize
FurtherLendingCustomer receivedCustomer = JsonConvert.DeserializeObject<FurtherLendingCustomer>(messageString.Substring(messageString.IndexOf("{")));
//assertion
Assert.AreEqual(sentCustomer.CustomerId, receivedCustomer.CustomerId,"verify customer id");
//pop message
message.Complete();
messageRead = true;
//leave loop after processing one message
break;
}
if (!messageRead)
Assert.Fail("Didn't receive any message after 2 mins");
}
}
}
1 ответ
Как говорится в официальном документе о SubscriptionClient.Receive (TimeSpan):
Параметры serverWaitTime TimeSpan
Промежуток времени, в течение которого сервер ожидает получения сообщения, прежде чем оно истечет.
Этот API-интерфейс может возвращать значение Null, если операция превысила указанное время ожидания, или операции завершились успешно, но сообщений больше нет.
Согласно моему тесту, если сообщение отправлено в тему, а затем доставлено в вашу подписку в пределах вашего определенного serverWaitTime, вы можете получить сообщение независимо от того, поступило ли сообщение в целевую тему до или после вашего вызова Receive
,
При использовании OnMessage тест всегда завершался преждевременно (около 30 секунд), что, я думаю, в любом случае означает, что он не подходит для этого теста.
[TestMethod]
public void ReceiveMessages()
{
subClient.OnMessage(msg => {
System.Diagnostics.Trace.TraceInformation($"{DateTime.Now}:{msg.GetBody<string>()}");
msg.Complete();
});
Task.Delay(TimeSpan.FromMinutes(5)).Wait();
}
Для клиента подписки. В сообщении я предположил, что это в основном вызов цикла Receive
, После звонка OnMessage
, вам нужно подождать некоторое время и остановить этот метод для выхода. Вот блог о программировании сообщений на основе событий для Windows Azure Service Bus, вы можете обратиться сюда.
Кроме того, я обнаружил, что ваш topicClient
для отправки сообщений и subClientKoEligible
для получения сообщения не ориентированы на один и тот же путь темы.