Разобрать огромный OData JSON путем потоковой передачи определенных разделов JSON, чтобы избежать LOH

У меня есть ответ OData в виде JSON (который составляет несколько МБ), и требуется потоковая передача "определенных частей JSON", даже не загружая их в память.

Например: "Когда я читаю собственность" value[0].Body.Content "в приведенном ниже JSON (который будет в мегабайтах) я хочу передать эту часть значения без десериализации ее в объект типа строки. Так что в основном считайте часть значения в байтовый массив фиксированного размера и запишите этот байтовый массив в целевой поток (повторение шага, пока эти данные не закончат обработку).

JSON:

{
    "@odata.context": "https://localhost:5555/api/v2.0/$metadata#Me/Messages",
    "value": [
        {
            "@odata.id": "https://localhost:5555/api/v2.0/",
            "@odata.etag": "W/\"Something\"",
            "Id": "vccvJHDSFds43hwy98fh",
            "CreatedDateTime": "2018-12-01T01:47:53Z",
            "LastModifiedDateTime": "2018-12-01T01:47:53Z",
            "ChangeKey": "SDgf43tsdf",
            "WebLink": "https://localhost:5555/?ItemID=dfsgsdfg9876ijhrf",
            "Body": {
                "ContentType": "HTML",
                "Content": "<html>\r\n<body>Huge Data Here\r\n</body>\r\n</html>\r\n"
            },
            "ToRecipients": [{
                    "EmailAddress": {
                        "Name": "ME",
                        "Address": "me@me.com"
                    }
                }
            ],
            "CcRecipients": [],
            "BccRecipients": [],
            "ReplyTo": [],
            "Flag": {
                "FlagStatus": "NotFlagged"
            }
        }
    ],
    "@odata.nextLink": "http://localhost:5555/rest/jersey/sleep?%24filter=LastDeliveredDateTime+ge+2018-12-01+and+LastDeliveredDateTime+lt+2018-12-02&%24top=50&%24skip=50"
}

Подходы пробовали:
1. Ньютонсофт

Сначала я попытался использовать потоковую передачу Newtonsoft, но она внутренне преобразовывает данные в строку и загружает в память. (Это приводит к тому, что LOH начинает работать, а память не освобождается до тех пор, пока не произойдет сжатие - у нас есть ограничение памяти для нашего рабочего процесса, и мы не можем сохранить его в памяти)

**code:**

    using (var jsonTextReader = new JsonTextReader(sr))
    {
        var pool = new CustomArrayPool();
        // Checking if pooling will help with memory
        jsonTextReader.ArrayPool = pool;

        while (jsonTextReader.Read())
        {
            if (jsonTextReader.TokenType == JsonToken.PropertyName
                && ((string)jsonTextReader.Value).Equals("value"))
            {
                jsonTextReader.Read();

                if (jsonTextReader.TokenType == JsonToken.StartArray)
                {
                    while (jsonTextReader.Read())
                    {
                        if (jsonTextReader.TokenType == JsonToken.StartObject)
                        {
                            var Current = JToken.Load(jsonTextReader);
                            // By Now, the LOH Shoots up.
                            // Avoid below code of converting this JToken back to byte array.
                            destinationStream.write(Encoding.ASCII.GetBytes(Current.ToString()));
                        }
                        else if (jsonTextReader.TokenType == JsonToken.EndArray)
                        {
                            break;
                        }
                    }
                }
            }

            if (jsonTextReader.TokenType == JsonToken.StartObject)
            {
                var Current = JToken.Load(jsonTextReader);
                // Do some processing with Current
                destinationStream.write(Encoding.ASCII.GetBytes(Current.ToString()));
            }
        }
    }
  1. OData.Net:

    Я подумал, можно ли это сделать с помощью библиотеки OData.Net, так как она поддерживает потоковую передачу строковых полей. Но я не мог уйти далеко, так как в итоге я создал Модель для данных, что означало бы, что значение будет преобразовано в один строковый объект из МБ.

    Код

    ODataMessageReaderSettings settings = new ODataMessageReaderSettings();
    IODataResponseMessage responseMessage = new InMemoryMessage { Stream = stream };
    responseMessage.SetHeader("Content-Type", "application/json;odata.metadata=minimal;");
    // ODataMessageReader reader = new ODataMessageReader((IODataResponseMessage)message, settings, GetEdmModel());
    ODataMessageReader reader = new ODataMessageReader(responseMessage, settings, new EdmModel());
    var oDataResourceReader = reader.CreateODataResourceReader();
    var property = reader.ReadProperty();
    


Любая идея, как разобрать этот JSON по частям, используя OData.Net/Newtonsoft и потоковое значение определенных полей?
Это единственный способ сделать это, вручную проанализировать поток?

1 ответ

Решение

Если вы копируете части JSON из одного потока в другой, вы можете сделать это более эффективно с JsonWriter.WriteToken(JsonReader) таким образом избегая промежуточного Current = JToken.Load(jsonTextReader) а также Encoding.ASCII.GetBytes(Current.ToString()) представления и связанные с ними накладные расходы памяти:

using (var textWriter = new StreamWriter(destinationStream, new UTF8Encoding(false, true), 1024, true))
using (var jsonWriter = new JsonTextWriter(textWriter) { Formatting = Formatting.Indented, CloseOutput = false })
{
    // Use Formatting.Indented or Formatting.None as required.
    jsonWriter.WriteToken(jsonTextReader);
}

Тем не менее, Json.NET JsonTextReader не может читать одно строковое значение в "чанках" так же, как XmlReader.ReadValueChunk(), Он всегда полностью материализует каждое атомарное значение строки. Если ваши строковые значения настолько велики, что они собираются в куче больших объектов, даже используя JsonWriter.WriteToken() не будет препятствовать тому, чтобы эти строки были полностью загружены в память.

В качестве альтернативы вы можете рассмотреть читателей и писателей, возвращенных JsonReaderWriterFactory, Эти читатели и писатели используются DataContractJsonSerializer и переводить JSON в XML на лету во время чтения и записи. Поскольку базовые классы для этих читателей и писателей XmlReader а также XmlWriter они поддерживают чтение и запись строковых значений в чанках. Правильное их использование позволит избежать размещения строк в куче больших объектов.

Для этого сначала определите следующие методы расширения, которые копируют выбранное подмножество значений (й) JSON из входного потока в выходной поток, как указано путем к данным, которые будут передаваться в поток:

public static class JsonExtensions
{
    public static void StreamNested(Stream from, Stream to, string [] path)
    {
        var reversed = path.Reverse().ToArray();

        using (var xr = JsonReaderWriterFactory.CreateJsonReader(from, XmlDictionaryReaderQuotas.Max))
        {
            foreach (var subReader in xr.ReadSubtrees(s => s.Select(n => n.LocalName).SequenceEqual(reversed)))
            {
                using (var xw = JsonReaderWriterFactory.CreateJsonWriter(to, Encoding.UTF8, false))
                {
                    subReader.MoveToContent();

                    xw.WriteStartElement("root");
                    xw.WriteAttributes(subReader, true);

                    subReader.Read();

                    while (!subReader.EOF)
                    {
                        if (subReader.NodeType == XmlNodeType.Element && subReader.Depth == 1)
                            xw.WriteNode(subReader, true);
                        else
                            subReader.Read();
                    }

                    xw.WriteEndElement();
                }
            }
        }
    }
}

public static class XmlReaderExtensions
{
    public static IEnumerable<XmlReader> ReadSubtrees(this XmlReader xmlReader, Predicate<Stack<XName>> filter)
    {
        Stack<XName> names = new Stack<XName>();

        while (xmlReader.Read())
        {
            if (xmlReader.NodeType == XmlNodeType.Element)
            {
                names.Push(XName.Get(xmlReader.LocalName, xmlReader.NamespaceURI));
                if (filter(names))
                {
                    using (var subReader = xmlReader.ReadSubtree())
                    {
                        yield return subReader;
                    }
                }
            }

            if ((xmlReader.NodeType == XmlNodeType.Element && xmlReader.IsEmptyElement)
                || xmlReader.NodeType == XmlNodeType.EndElement)
            {
                names.Pop();
            }
        }
    }
}

Теперь string [] path аргумент StreamNested() это не какой-либо путь jsonpath. Вместо этого это путь, соответствующий иерархии элементов XML, соответствующих JSON, который вы хотите выбрать в переводе XmlReader вернулся JsonReaderWriterFactory.CreateJsonReader() , Сопоставление, используемое для этого перевода, в свою очередь, задокументировано Microsoft в " Сопоставлении между JSON и XML". Чтобы выбрать и передать только те значения JSON, которые соответствуют value[*] требуется XML-путь //root/value/item, Таким образом, вы можете выбирать и передавать нужные вложенные объекты, выполнив:

JsonExtensions.StreamNested(inputStream, destinationStream, new[] { "root", "value", "item" });

Заметки:

  • Отображение между JSON и XML довольно сложно. Часто проще просто загрузить образец JSON в XDocument используя следующий метод расширения:

    static XDocument ParseJsonAsXDocument(string json)
    {
        using (var xr = JsonReaderWriterFactory.CreateJsonReader(new MemoryStream(Encoding.UTF8.GetBytes(json)), Encoding.UTF8, XmlDictionaryReaderQuotas.Max, null))
        {
            return XDocument.Load(xr);
        }
    }
    

    А затем определите правильный XML-путь на основе наблюдений.

  • По вопросам, связанным с, см. JObject.SelectToken Эквивалент в.NET.

Другие вопросы по тегам