Интеграционный тест для публикации в одной теме и подписки на другую в Azure Service Bus ненадежен. Существует ли условие гонки?

Я пытаюсь написать интеграционный / приемочный тест для тестирования некоторого кода на лазурном, код в вопросе ATM просто подписывается на одну тему и публикуется в другой.

Я написал тест, но он не всегда проходит, кажется, что может быть условие гонки. Я попытался написать его несколькими способами, включая использование OnMessage, а также использование Receive (пример, который я показываю здесь).

При использовании OnMessage тест всегда завершался преждевременно (около 30 секунд), что, я думаю, в любом случае означает, что он не подходит для этого теста.

Мой запрос, касающийся моего примера, в частности, я предполагал, что после создания подписки на целевую тему, что любое сообщение, отправленное на него, я смогу забрать с помощью Receive(), в любой момент времени это сообщение означает, если сообщение приходит в целевой теме, прежде чем я вызову Receive(), я все равно смогу прочитать сообщение позже, вызвав Receive(). Может ли кто-нибудь пролить свет на это?

    namespace somenamespace {
    [TestClass]
    public class SampleTopicTest
    {
        private static TopicClient topicClient;
        private static SubscriptionClient subClientKoEligible;
        private static SubscriptionClient subClientKoIneligible;

        private static OnMessageOptions options;
        public const string TEST_MESSAGE_SUB = "TestMessageSub";
        private static NamespaceManager namespaceManager;

        private static string topicFleKoEligible;
        private static string topicFleKoIneligible;

        private BrokeredMessage message;

        [ClassInitialize]
        public static void BeforeClass(TestContext testContext)
        {
            //client for publishing messages
            string connectionString = ConfigurationManager.AppSettings["ServiceBusConnectionString"];
            string topicDataReady = ConfigurationManager.AppSettings["DataReadyTopicName"];
            topicClient = TopicClient.CreateFromConnectionString(connectionString, topicDataReady);

            topicFleKoEligible = ConfigurationManager.AppSettings["KnockOutEligibleTopicName"];
            topicFleKoIneligible = ConfigurationManager.AppSettings["KnockOutIneligibleTopicName"];

            //create test subscription to receive messages
            namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);


            if (!namespaceManager.SubscriptionExists(topicFleKoEligible, TEST_MESSAGE_SUB))
            {
                namespaceManager.CreateSubscription(topicFleKoEligible, TEST_MESSAGE_SUB);
            }

            if (!namespaceManager.SubscriptionExists(topicFleKoIneligible, TEST_MESSAGE_SUB))
            {
                namespaceManager.CreateSubscription(topicFleKoIneligible, TEST_MESSAGE_SUB);
            }

            //subscriber client koeligible
            subClientKoEligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoEligible, TEST_MESSAGE_SUB);

            subClientKoIneligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoIneligible, TEST_MESSAGE_SUB);

            options = new OnMessageOptions()
            {
                AutoComplete = false,
                AutoRenewTimeout = TimeSpan.FromMinutes(1),

            };
        }

          [TestMethod]
        public void E2EPOCTopicTestLT50()
        {
            Random rnd = new Random();
            string customerId = rnd.Next(1, 49).ToString();

            FurtherLendingCustomer sentCustomer = new FurtherLendingCustomer { CustomerId = customerId };
            BrokeredMessage sentMessage = new BrokeredMessage(sentCustomer.ToJson());           
            sentMessage.CorrelationId = Guid.NewGuid().ToString();
            string messageId = sentMessage.MessageId;
            topicClient.Send(sentMessage);

            Boolean messageRead = false;

            //wait for message to arrive on the ko eligible queue
            while((message = subClientKoEligible.Receive(TimeSpan.FromMinutes(2))) != null){

                //read message
                string messageString = message.GetBody<String>();

                //Serialize
                FurtherLendingCustomer receivedCustomer =  JsonConvert.DeserializeObject<FurtherLendingCustomer>(messageString.Substring(messageString.IndexOf("{")));

                //assertion
                Assert.AreEqual(sentCustomer.CustomerId, receivedCustomer.CustomerId,"verify customer id");

                //pop message
                message.Complete();
                messageRead = true;

                //leave loop after processing one message
                break;
            }
            if (!messageRead)
                Assert.Fail("Didn't receive any message after 2 mins");

        }
    }
}

1 ответ

Как говорится в официальном документе о SubscriptionClient.Receive (TimeSpan):

Параметры serverWaitTime TimeSpan

Промежуток времени, в течение которого сервер ожидает получения сообщения, прежде чем оно истечет.

Этот API-интерфейс может возвращать значение Null, если операция превысила указанное время ожидания, или операции завершились успешно, но сообщений больше нет.

Согласно моему тесту, если сообщение отправлено в тему, а затем доставлено в вашу подписку в пределах вашего определенного serverWaitTime, вы можете получить сообщение независимо от того, поступило ли сообщение в целевую тему до или после вашего вызова Receive,

При использовании OnMessage тест всегда завершался преждевременно (около 30 секунд), что, я думаю, в любом случае означает, что он не подходит для этого теста.

[TestMethod]
public void ReceiveMessages()
{
    subClient.OnMessage(msg => {
        System.Diagnostics.Trace.TraceInformation($"{DateTime.Now}:{msg.GetBody<string>()}");
        msg.Complete();
    });
    Task.Delay(TimeSpan.FromMinutes(5)).Wait();
}

Для клиента подписки. В сообщении я предположил, что это в основном вызов цикла Receive, После звонка OnMessage, вам нужно подождать некоторое время и остановить этот метод для выхода. Вот блог о программировании сообщений на основе событий для Windows Azure Service Bus, вы можете обратиться сюда.

Кроме того, я обнаружил, что ваш topicClient для отправки сообщений и subClientKoEligible для получения сообщения не ориентированы на один и тот же путь темы.

Другие вопросы по тегам