В запущенных очередях веб-заданий Azure можно ли изменить сообщение очереди хранения Azure после сбоя функции веб-задания, но до отравления?

У меня есть функции, запускаемые очередями, в моих веб-заданиях Azure. Нормальное поведение, конечно, это когда функция не работает MaxDequeueCount раз сообщение помещается в соответствующую очередь яда. Я хотел бы изменить сообщение после ошибки, но до вставки ядовитой очереди. Пример:

Начальное сообщение:

{ "Name":"Tom", "Age", 30" }

И после сбоя я хочу изменить сообщение следующим образом и добавить измененное сообщение в очередь ядовитых сообщений:

{ "Name":"Tom", "Age", 30", "ErrorMessage":"Unable to find user" }

Можно ли это сделать?

2 ответа

Согласно документации Webjobs, сообщения будут помещены в очередь ядовитых после 5 неудачных попыток обработать сообщение:

SDK будет вызывать функцию до 5 раз для обработки сообщения очереди. Если пятая попытка не удалась, сообщение перемещается в очередь отравления. Максимальное количество повторов настраивается.

Источник: https://github.com/Azure/azure-webjobs-sdk/wiki/Queues

Это автоматическое поведение. Но вы все равно можете обрабатывать исключения в своем коде функции WebJobs (таким образом, исключение не покидает вашу функцию, и автоматическая обработка вредоносных сообщений не запускается) и помещать измененное сообщение в очередь, используя привязки вывода.

Другой вариант - проверить свойство dequeueCount, которое указывает, сколько раз пытались обработать сообщение.

Вы можете узнать, сколько раз сообщение было взято для обработки, добавив параметр int с именем dequeueCount в вашу функцию. Затем вы можете проверить счетчик очереди в коде функции и выполнить собственную обработку вредоносных сообщений, когда число превышает пороговое значение, как показано в следующем примере.

public static void CopyBlob(
        [QueueTrigger("copyblobqueue")] string blobName, int dequeueCount,
        [Blob("textblobs/{queueTrigger}", FileAccess.Read)] Stream blobInput,
        [Blob("textblobs/{queueTrigger}-new", FileAccess.Write)] Stream blobOutput,
        TextWriter logger)
    {
        if (dequeueCount > 3)
        {
            logger.WriteLine("Failed to copy blob, name=" + blobName);
        }
        else
        {
        blobInput.CopyTo(blobOutput, 4096);
        }
    }

(также взято по ссылке выше).

Ваша подпись функции может выглядеть так

public static void ProcessQueueMessage(
            [QueueTrigger("myqueue")] CloudQueueMessage message,
            [Queue("myqueue-poison")] CloudQueueMessage poisonMessage,
            TextWriter logger)

Максимальное время повторения по умолчанию равно 5. Вы также можете установить это значение самостоятельно, используя свойство Queues.MaxDequeueCount из JobHostConfiguration() Например, код, как показано ниже:

static void Main(string[] args)
{
    var config = new JobHostConfiguration();            
    config.Queues.MaxDequeueCount = 5; // set the maximum retry time
    var host = new JobHost(config);
    host.RunAndBlock();
} 

Затем вы можете обновить сообщение об ошибке в очереди, когда достигнуто максимальное время повтора. Вы можете указать несуществующий контейнер BLOB-объектов для применения механизма повторных попыток. Код как ниже:

public static void ProcessQueueMessage([QueueTrigger("queue")] CloudQueueMessage message, [Blob("container/{queueTrigger}", FileAccess.Read)] Stream myBlob, ILogger logger)
 {
       string yourUpdatedString = "ErrorMessage" + ":" + "Unable to find user";
       string str1 = message.AsString;
       if (message.DequeueCount == 5) // here, the maximum retry time is set to 5
       {
                message.SetMessageContent(str1.Replace("}", "," + yourUpdatedString + "}"));   // modify the failed message here
       }
       logger.LogInformation($"Blob name:{message} \n Size: {myBlob.Length} bytes");
 }

Когда вышеупомянутое сделано, вы можете увидеть обновленное сообщение очереди в очереди-яд.

ОБНОВЛЕНО:

Поскольку CloudQueueMessage является запечатанным классом, мы не можем его наследовать.

Для вашего сообщения MySpecialPoco вы можете использовать JsonConvert.SerializeObject(message), код такой как ниже:

using  Newtonsoft.Json;

static int number = 0;
 public static void ProcessQueueMessage([QueueTrigger("queue")] object message, [Blob("container/{queueTrigger}", FileAccess.Read)] Stream myBlob, ILogger logger)
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));    
            CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();                
            CloudQueue queue = queueClient.GetQueueReference("queue-poison");// get the poison queue
            CloudQueueMessage msg1 = new CloudQueueMessage(JsonConvert.SerializeObject(message));
            number++;
            string yourUpdatedString = "\"ErrorMessage\"" + ":" + "\"Unable to find user\"";
            string str1 = msg1.AsString;

            if (number == 5)
            {    
                msg1.SetMessageContent(str1.Replace("}", "," + yourUpdatedString + "}"));
                queue.AddMessage(msg1);                  
                number = 0;                   
            }                
            logger.LogInformation($"Blob name:{message} \n Size: {myBlob.Length} bytes");
        }

Но плохо то, что оба исходных / обновленных сообщения очереди записываются в очередь отравлений.

Другие вопросы по тегам