Сообщение из очереди сервисной шины исчезает при ошибке в функции активности
Я разработал приложение Azure Durable Functions, которое запускает новые сообщения очереди служебной шины. Он работает нормально, когда ошибок не возникает, но когда происходит ошибка в функции активности, он регистрирует, что произошел сбой, но сообщение навсегда ушло из очереди. Что может быть причиной этого, и как я могу предотвратить исчезновение сообщения из очереди при ошибке?
Вот воспроизводимый код, это код, сгенерированный из нового шаблона функции Azure в VS2017, только исключение добавляется, когда городом является "Сиэтл", и это ServicebusTrigger вместо HttpTrigger.
[FunctionName("Test")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
[FunctionName("Test_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"Saying hello to {name}.");
if (name == "Seattle")
throw new Exception("An error occurs");
return $"Hello {name}!";
}
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
Обновление: когда у меня есть исключение в клиентской функции Orchestration, оно делает правильную вещь, например, повторяет попытку и помещает сообщение в очередь недоставленных сообщений, если повторная попытка не удалась x раз.
Таким образом, мне удалось обойти это, обновив клиентскую функцию с помощью цикла while, проверяя состояние сбоя / прекращения / отмены.
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
var status = await starter.GetStatusAsync(instanceId);
while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
{
System.Threading.Thread.Sleep(1000);
status = await starter.GetStatusAsync(instanceId);
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
{
throw new Exception("Orchestration failed with error: " + status.Output);
}
}
}
Однако это кажется мне хаком, и я не видел такого типа кода ни в одном примере кода MS. Я полагаю, что об этом должна заботиться структура надежных функций. Есть ли другой способ заставить триггер служебной шины работать в надежных функциях?
2 ответа
Такое поведение является особенностью. Начало оркестровки асинхронно - т.е. StartNewAsync
API не будет автоматически ожидать запуска или завершения оркестровки. Внутренне StartNewAsync
просто помещает сообщение в очередь хранилища Azure и записывает запись в таблицу хранилища Azure. Если это произойдет успешно, ваша функция служебной шины продолжит работу и завершится успешно, после чего сообщение будет удалено.
Ваш обходной путь приемлем, если вам действительно нужно повторить сообщение из очереди служебной шины, но я спрашиваю, зачем вам это нужно. Сама оркестровка может самостоятельно управлять повторными попытками, не полагаясь на служебную шину. Например, вы можете использовать CallActivityWithRetryAsync для внутренней повторной попытки внутри оркестровки.
См. Раздел " Обработка ошибок" в документации по долговременным функциям.
Я знаю, что это старая тема, но я хотел поделиться тем, как я заработал с и
WaitForCompletionOrCreateCheckStatusResponseAsync
.
[FunctionName(nameof(QueueTriggerFunction))]
public async Task QueueTriggerFunction(
[ServiceBusTrigger("queue-name", Connection = "connectionstring-key")]string queueMessage,
MessageReceiver messageReceiver,
string lockToken,
string messageId,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
//note: autocomplete is disabled
try
{
//start durable function
var instanceId = await starter.StartNewAsync(nameof(OrchestratorFunction), queueMessage);
//get the payload (we want to use the status uri)
var payload = starter.CreateHttpManagementPayload(instanceId);
//instruct QueueTriggerFunction to wait for response
await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(new HttpRequestMessage(HttpMethod.Get, payload.StatusQueryGetUri), instanceId);
//response ready, get status
var status = await starter.GetStatusAsync(instanceId);
//act on status
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
//like completing the message
await messageReceiver.CompleteAsync(lockToken);
log.LogInformation($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} succeeded [MessageId={messageId}]");
}
else
{
//or deadletter the sob
await messageReceiver.DeadLetterAsync(lockToken);
log.LogError($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} failed [MessageId={messageId}]");
}
}
catch (Exception ex)
{
//not sure what went wrong, let the lock expire and try again (until max retry attempts is reached)
log.LogError(ex, $"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: handler failed [MessageId={messageId}]");
}
}
Дело в том, что все примеры в Интернете используют
HttpTrigger
и используйте httprequest этого триггера для проверки завершения, но у вас нет этого с
ServiceBusTrigger
. Более того, я не думаю, что это правильно, и вы должны использовать статус uri из вызова полезной нагрузки, как я делаю здесь с instanceId функции оркестратора.