В запущенных очередях веб-заданий Azure можно ли изменить сообщение очереди хранения Azure после сбоя функции веб-задания, но до отравления?
У меня есть функции, запускаемые очередями, в моих веб-заданиях Azure. Нормальное поведение, конечно, это когда функция не работает MaxDequeueCount
раз сообщение помещается в соответствующую очередь яда. Я хотел бы изменить сообщение после ошибки, но до вставки ядовитой очереди. Пример:
Начальное сообщение:
{ "Name":"Tom", "Age", 30" }
И после сбоя я хочу изменить сообщение следующим образом и добавить измененное сообщение в очередь ядовитых сообщений:
{ "Name":"Tom", "Age", 30", "ErrorMessage":"Unable to find user" }
Можно ли это сделать?
2 ответа
Согласно документации Webjobs, сообщения будут помещены в очередь ядовитых после 5 неудачных попыток обработать сообщение:
SDK будет вызывать функцию до 5 раз для обработки сообщения очереди. Если пятая попытка не удалась, сообщение перемещается в очередь отравления. Максимальное количество повторов настраивается.
Источник: https://github.com/Azure/azure-webjobs-sdk/wiki/Queues
Это автоматическое поведение. Но вы все равно можете обрабатывать исключения в своем коде функции WebJobs (таким образом, исключение не покидает вашу функцию, и автоматическая обработка вредоносных сообщений не запускается) и помещать измененное сообщение в очередь, используя привязки вывода.
Другой вариант - проверить свойство dequeueCount, которое указывает, сколько раз пытались обработать сообщение.
Вы можете узнать, сколько раз сообщение было взято для обработки, добавив параметр int с именем dequeueCount в вашу функцию. Затем вы можете проверить счетчик очереди в коде функции и выполнить собственную обработку вредоносных сообщений, когда число превышает пороговое значение, как показано в следующем примере.
public static void CopyBlob(
[QueueTrigger("copyblobqueue")] string blobName, int dequeueCount,
[Blob("textblobs/{queueTrigger}", FileAccess.Read)] Stream blobInput,
[Blob("textblobs/{queueTrigger}-new", FileAccess.Write)] Stream blobOutput,
TextWriter logger)
{
if (dequeueCount > 3)
{
logger.WriteLine("Failed to copy blob, name=" + blobName);
}
else
{
blobInput.CopyTo(blobOutput, 4096);
}
}
(также взято по ссылке выше).
Ваша подпись функции может выглядеть так
public static void ProcessQueueMessage(
[QueueTrigger("myqueue")] CloudQueueMessage message,
[Queue("myqueue-poison")] CloudQueueMessage poisonMessage,
TextWriter logger)
Максимальное время повторения по умолчанию равно 5. Вы также можете установить это значение самостоятельно, используя свойство Queues.MaxDequeueCount
из JobHostConfiguration()
Например, код, как показано ниже:
static void Main(string[] args)
{
var config = new JobHostConfiguration();
config.Queues.MaxDequeueCount = 5; // set the maximum retry time
var host = new JobHost(config);
host.RunAndBlock();
}
Затем вы можете обновить сообщение об ошибке в очереди, когда достигнуто максимальное время повтора. Вы можете указать несуществующий контейнер BLOB-объектов для применения механизма повторных попыток. Код как ниже:
public static void ProcessQueueMessage([QueueTrigger("queue")] CloudQueueMessage message, [Blob("container/{queueTrigger}", FileAccess.Read)] Stream myBlob, ILogger logger)
{
string yourUpdatedString = "ErrorMessage" + ":" + "Unable to find user";
string str1 = message.AsString;
if (message.DequeueCount == 5) // here, the maximum retry time is set to 5
{
message.SetMessageContent(str1.Replace("}", "," + yourUpdatedString + "}")); // modify the failed message here
}
logger.LogInformation($"Blob name:{message} \n Size: {myBlob.Length} bytes");
}
Когда вышеупомянутое сделано, вы можете увидеть обновленное сообщение очереди в очереди-яд.
ОБНОВЛЕНО:
Поскольку CloudQueueMessage является запечатанным классом, мы не можем его наследовать.
Для вашего сообщения MySpecialPoco вы можете использовать JsonConvert.SerializeObject(message), код такой как ниже:
using Newtonsoft.Json;
static int number = 0;
public static void ProcessQueueMessage([QueueTrigger("queue")] object message, [Blob("container/{queueTrigger}", FileAccess.Read)] Stream myBlob, ILogger logger)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("queue-poison");// get the poison queue
CloudQueueMessage msg1 = new CloudQueueMessage(JsonConvert.SerializeObject(message));
number++;
string yourUpdatedString = "\"ErrorMessage\"" + ":" + "\"Unable to find user\"";
string str1 = msg1.AsString;
if (number == 5)
{
msg1.SetMessageContent(str1.Replace("}", "," + yourUpdatedString + "}"));
queue.AddMessage(msg1);
number = 0;
}
logger.LogInformation($"Blob name:{message} \n Size: {myBlob.Length} bytes");
}
Но плохо то, что оба исходных / обновленных сообщения очереди записываются в очередь отравлений.