Сообщение из очереди сервисной шины исчезает при ошибке в функции активности

Я разработал приложение Azure Durable Functions, которое запускает новые сообщения очереди служебной шины. Он работает нормально, когда ошибок не возникает, но когда происходит ошибка в функции активности, он регистрирует, что произошел сбой, но сообщение навсегда ушло из очереди. Что может быть причиной этого, и как я могу предотвратить исчезновение сообщения из очереди при ошибке?

Вот воспроизводимый код, это код, сгенерированный из нового шаблона функции Azure в VS2017, только исключение добавляется, когда городом является "Сиэтл", и это ServicebusTrigger вместо HttpTrigger.

            [FunctionName("Test")]
    public static async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] DurableOrchestrationContext context)
    {
        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }

    [FunctionName("Test_Hello")]
    public static string SayHello([ActivityTrigger] string name, ILogger log)
    {
        log.LogInformation($"Saying hello to {name}.");
        if (name == "Seattle")
            throw new Exception("An error occurs");
        return $"Hello {name}!";
    }

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    }

Обновление: когда у меня есть исключение в клиентской функции Orchestration, оно делает правильную вещь, например, повторяет попытку и помещает сообщение в очередь недоставленных сообщений, если повторная попытка не удалась x раз.

Таким образом, мне удалось обойти это, обновив клиентскую функцию с помощью цикла while, проверяя состояние сбоя / прекращения / отмены.

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

        var status = await starter.GetStatusAsync(instanceId);

        while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
        {
            System.Threading.Thread.Sleep(1000);
            status = await starter.GetStatusAsync(instanceId);
            if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed 
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
            {
                throw new Exception("Orchestration failed with error: " + status.Output);
            }
        }

    }

Однако это кажется мне хаком, и я не видел такого типа кода ни в одном примере кода MS. Я полагаю, что об этом должна заботиться структура надежных функций. Есть ли другой способ заставить триггер служебной шины работать в надежных функциях?

2 ответа

Решение

Такое поведение является особенностью. Начало оркестровки асинхронно - т.е. StartNewAsync API не будет автоматически ожидать запуска или завершения оркестровки. Внутренне StartNewAsync просто помещает сообщение в очередь хранилища Azure и записывает запись в таблицу хранилища Azure. Если это произойдет успешно, ваша функция служебной шины продолжит работу и завершится успешно, после чего сообщение будет удалено.

Ваш обходной путь приемлем, если вам действительно нужно повторить сообщение из очереди служебной шины, но я спрашиваю, зачем вам это нужно. Сама оркестровка может самостоятельно управлять повторными попытками, не полагаясь на служебную шину. Например, вы можете использовать CallActivityWithRetryAsync для внутренней повторной попытки внутри оркестровки.

См. Раздел " Обработка ошибок" в документации по долговременным функциям.

Я знаю, что это старая тема, но я хотел поделиться тем, как я заработал с и WaitForCompletionOrCreateCheckStatusResponseAsync.

      [FunctionName(nameof(QueueTriggerFunction))]
public async Task QueueTriggerFunction(
    [ServiceBusTrigger("queue-name", Connection = "connectionstring-key")]string queueMessage,
    MessageReceiver messageReceiver,
    string lockToken,
    string messageId,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    //note: autocomplete is disabled
    try
    {
        //start durable function
        var instanceId = await starter.StartNewAsync(nameof(OrchestratorFunction), queueMessage);

        //get the payload (we want to use the status uri)
        var payload = starter.CreateHttpManagementPayload(instanceId);

        //instruct QueueTriggerFunction to wait for response
        await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(new HttpRequestMessage(HttpMethod.Get, payload.StatusQueryGetUri), instanceId);

        //response ready, get status
        var status = await starter.GetStatusAsync(instanceId);

        //act on status
        if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
        {
            //like completing the message
            await messageReceiver.CompleteAsync(lockToken);
            log.LogInformation($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} succeeded [MessageId={messageId}]");
        }
        else
        {
            //or deadletter the sob
            await messageReceiver.DeadLetterAsync(lockToken);
            log.LogError($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} failed [MessageId={messageId}]");
        }
    }
    catch (Exception ex)
    {
        //not sure what went wrong, let the lock expire and try again (until max retry attempts is reached)
        log.LogError(ex, $"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: handler failed [MessageId={messageId}]");
    }
}

Дело в том, что все примеры в Интернете используют HttpTriggerи используйте httprequest этого триггера для проверки завершения, но у вас нет этого с ServiceBusTrigger. Более того, я не думаю, что это правильно, и вы должны использовать статус uri из вызова полезной нагрузки, как я делаю здесь с instanceId функции оркестратора.