Передача объектных сообщений в хранилище очередей Azure

Я пытаюсь найти способ передачи объектов в очередь Azure. Я не мог найти способ сделать это.

Как я уже видел, я могу передать строку или байтовый массив, что не очень удобно для передачи объектов.

Есть ли способ передать пользовательские объекты в очередь?

Спасибо!

6 ответов

Решение

Вы можете использовать следующие классы в качестве примера:

 [Serializable]
    public abstract class BaseMessage
    {
        public byte[] ToBinary()
        {
            BinaryFormatter bf = new BinaryFormatter();
            byte[] output = null;
            using (MemoryStream ms = new MemoryStream())
            {
                ms.Position = 0;
                bf.Serialize(ms, this);
                output = ms.GetBuffer();
            }
            return output;
        }

        public static T FromMessage<T>(CloudQueueMessage m)
        {
            byte[] buffer = m.AsBytes;
            T returnValue = default(T);
            using (MemoryStream ms = new MemoryStream(buffer))
            {
                ms.Position = 0;
                BinaryFormatter bf = new BinaryFormatter();
                returnValue = (T)bf.Deserialize(ms);
            }
            return returnValue;
        }
    }

Затем StdQueue (очередь со строгой типизацией):

   public class StdQueue<T> where T : BaseMessage, new()
    {
        protected CloudQueue queue;

        public StdQueue(CloudQueue queue)
        {
            this.queue = queue;
        }

        public void AddMessage(T message)
        {
            CloudQueueMessage msg =
            new CloudQueueMessage(message.ToBinary());
            queue.AddMessage(msg);
        }

        public void DeleteMessage(CloudQueueMessage msg)
        {
            queue.DeleteMessage(msg);
        }

        public CloudQueueMessage GetMessage()
        {
            return queue.GetMessage(TimeSpan.FromSeconds(120));
        }
    }

Затем все, что вам нужно сделать, это наследовать BaseMessage:

[Serializable]
public class ParseTaskMessage : BaseMessage
{
    public Guid TaskId { get; set; }

    public string BlobReferenceString { get; set; }

    public DateTime TimeRequested { get; set; }
}

И создайте очередь, которая работает с этим сообщением:

CloudStorageAccount acc;
            if (!CloudStorageAccount.TryParse(connectionString, out acc))
            {
                throw new ArgumentOutOfRangeException("connectionString", "Invalid connection string was introduced!");
            }
            CloudQueueClient clnt = acc.CreateCloudQueueClient();
            CloudQueue queue = clnt.GetQueueReference(processQueue);
            queue.CreateIfNotExist();
            this._queue = new StdQueue<ParseTaskMessage>(queue);

Надеюсь это поможет!

Метод расширения, использующий Newtonsoft.Json и async

    public static async Task AddMessageAsJsonAsync<T>(this CloudQueue cloudQueue, T objectToAdd)
    {
        var messageAsJson = JsonConvert.SerializeObject(objectToAdd);
        var cloudQueueMessage = new CloudQueueMessage(messageAsJson);
        await cloudQueue.AddMessageAsync(cloudQueueMessage);
    }

Мне нравится этот подход обобщения, но мне не нравится помещать атрибут Serialize во все классы, которые я могу добавить в сообщение, и выводить их из базы (у меня, возможно, уже есть базовый класс), поэтому я использовал...

using System;
using System.Text;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;

namespace Example.Queue
{
    public static class CloudQueueMessageExtensions
    {
        public static CloudQueueMessage Serialize(Object o)
        {
            var stringBuilder = new StringBuilder();
            stringBuilder.Append(o.GetType().FullName);
            stringBuilder.Append(':');
            stringBuilder.Append(JsonConvert.SerializeObject(o));
            return new CloudQueueMessage(stringBuilder.ToString());
        }

        public static T Deserialize<T>(this CloudQueueMessage m)
        {
            int indexOf = m.AsString.IndexOf(':');

            if (indexOf <= 0)
                throw new Exception(string.Format("Cannot deserialize into object of type {0}", 
                    typeof (T).FullName));

            string typeName = m.AsString.Substring(0, indexOf);
            string json = m.AsString.Substring(indexOf + 1);

            if (typeName != typeof (T).FullName)
            {
                throw new Exception(string.Format("Cannot deserialize object of type {0} into one of type {1}", 
                    typeName,
                    typeof (T).FullName));
            }

            return JsonConvert.DeserializeObject<T>(json);
        }
    }
}

например

var myobject = new MyObject();
_queue.AddMessage( CloudQueueMessageExtensions.Serialize(myobject));

var myobject = _queue.GetMessage().Deserialize<MyObject>();

Мне понравился подход @Akodo_Shado к сериализации с Newtonsoft.Json. Я обновил его для Azure.Storage.Queues а также добавлен метод «Получить и удалить», который десериализует объект из очереди.

      public static class CloudQueueExtensions
{
    public static async Task AddMessageAsJsonAsync<T>(this QueueClient queueClient, T objectToAdd) where T : class
    {
        string messageAsJson = JsonConvert.SerializeObject(objectToAdd);
        BinaryData cloudQueueMessage = new BinaryData(messageAsJson);
        await queueClient.SendMessageAsync(cloudQueueMessage);
    }

    public static async Task<T> RetreiveAndDeleteMessageAsObjectAsync<T>(this QueueClient queueClient) where T : class
    {

        QueueMessage[] retrievedMessage = await queueClient.ReceiveMessagesAsync(1);
        if (retrievedMessage.Length == 0) return null;
        string theMessage = retrievedMessage[0].MessageText;
        T instanceOfT = JsonConvert.DeserializeObject<T>(theMessage);
        await queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);

        return instanceOfT;
    }
}

В RetreiveAndDeleteMessageAsObjectAsync предназначен для обработки 1 сообщения за раз, но вы, очевидно, можете переписать, чтобы десериализовать полный массив сообщений и вернуть ICollection<T> или похожие.

Если очередь хранения используется с функцией WebJob или Azure (довольно распространенный сценарий), то текущий Azure SDK позволяет напрямую использовать объект POCO. Смотрите примеры здесь:

Примечание: SDK будет автоматически использовать Newtonsoft.Json для сериализации / десериализации под капотом.

Это не правильный способ сделать это. очереди не предназначены для хранения объекта. вам нужно поместить объект в BLOB или таблицу (сериализовано). Я считаю, что тело очереди сообщений имеет ограничение размера 64 КБ с SDK1,5 и 8 КБ с более низкими версиями. Тело Messgae предназначено для передачи важных данных для воркера, которые его получают.

Другие вопросы по тегам