Функция Azure: как лучше реализовать задержку повторения сообщения очереди
Моя функция Azure должна прослушивать очередь сообщений, затем получать сообщение, пытаться вызвать внешнюю службу со значением в сообщении, если внешняя служба возвращает "ОК", то мы должны записать сообщение в другую очередь (для следующей функции Azure), если возвращает В случае неудачи мы должны вернуться в нашу текущую очередь и повторить попытку с помощью нашей функции Azure через 5 минут. Как это реализовать? Я сделал это с таймером, но решение не нравится мне:
[FunctionName("FunctionOffice365VerificateDomain_and_AddService_and_GexMxRecord")]
public async static Task Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
[Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]CloudQueue outputQueue,
ILogger log)
{
while (true)
{
// do "invisible" message for next 30 sec
var message = await listenQueue.GetMessageAsync();
if (message != null)
{
DomainForRegistration domainForRegistration = JsonConvert.DeserializeObject<DomainForRegistration>(message.AsString);
try
{
await _office365DomainService.VerifyDomainAsync(domainForRegistration.DomainName);
// remove message
await listenQueue.DeleteMessageAsync(message);
await _office365DomainService.UpdateIndicateSupportedServicesDomainAsync(domainForRegistration.DomainName);
var mxRecord = await _office365DomainService.GetMxRecordForDomainAsync(domainForRegistration.DomainName);
}
catch (DomainVerificationRecordNotFoundException)
{
// thrown when VerifyDomainAsync failed
}
}
else
break;
}
}
Как сделать это более аккуратно, без этих while(true)
, но с таймаутом после неудачной проверки?
1 ответ
Согласитесь с @DavidG, попробуйте использовать триггер очереди для достижения своей цели. W может положиться на настройку хоста Queue.
visibilityTimeout: интервал времени между повторными попытками при сбое обработки сообщения. maxDequeueCount - количество попыток обработки сообщения перед его перемещением в очередь нежелательных сообщений.
{
"version": "2.0",
"extensions": {
"queues": {
"visibilityTimeout" : "00:05:00",
"maxDequeueCount": 2,
}
}
}
Таким образом, функция должна выглядеть
public static async Task Run(
[QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue
)
{
// do stuff then output message
await outputQueue.AddAsync(myQueueItem);
}
Если вы не хотите выдавать исключение на хост, мы можем обратиться к initialVisibilityDelay метода CloudQueue.
указав интервал времени, в течение которого сообщение будет невидимым
public static async Task Run(
[QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue,
[Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue
)
{
try
{
// do stuff then output message
await outputQueue.AddAsync(myQueueItem);
}
catch(DomainVerificationRecordNotFoundException)
{
// add the message in current queue and can only be visible after 5 minutes
await listenQueue.AddMessageAsync(new CloudQueueMessage(myQueueItem), null, TimeSpan.FromMinutes(5), null, null);
}
}