Что такое хороший метод для обработки линейных сетевых потоков ввода-вывода?

Примечание: позвольте мне извиниться за длину этого вопроса, мне пришлось поместить в него много информации. Я надеюсь, что это не заставит слишком многих людей просто просматривать и делать предположения. Пожалуйста, прочитайте полностью. Благодарю.

У меня есть поток данных, поступающих через сокет. Эти данные ориентированы на строки.

Я использую APM (метод асинхронного программирования).NET (BeginRead и т. Д.). Это исключает использование потокового ввода-вывода, потому что асинхронный ввод-вывод основан на буфере. Можно упаковать данные и отправить их в поток, такой как поток памяти, но есть и проблемы там.

Проблема в том, что мой входной поток (который я не контролирую) не дает мне никакой информации о том, как долго этот поток. Это просто поток строк новой строки, выглядящих так:

COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

Таким образом, используя APM, и, поскольку я не знаю, как долго будет длиться какой-либо конкретный набор данных, вполне вероятно, что блоки данных будут пересекать границы буфера, требуя многократных чтений, но эти множественные чтения также будут охватывать несколько блоков данных.

Пример:

Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

Моей первой мыслью было использовать StringBuilder и просто добавить строки буфера в SB. Это работает в некоторой степени, но мне было трудно извлечь блоки данных. Я пытался использовать StringReader для чтения новых строк, но не было никакого способа узнать, была ли получена полная строка или нет, так как StringReader возвращает частичную строку в конце последнего добавленного блока с последующим возвратом нулевого значения после. Нет способа узнать, была ли возвращенная строка полностью новой строкой данных.

Пример:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

Что еще хуже, это то, что если я просто продолжаю добавлять данные, буферы становятся все больше и больше, и, поскольку это может работать неделями или месяцами, это не очень хорошее решение.

Моей следующей мыслью было удаление блоков данных из СБ, когда я их читал. Это потребовало написания моей собственной функции ReadLine, но затем я застрял, блокируя данные во время чтения и записи. Кроме того, большие блоки данных (которые могут состоять из сотен операций чтения и мегабайтов данных) требуют сканирования всего буфера в поисках новых строк. Это не эффективно и довольно уродливо.

Я ищу что-то, что имеет простоту StreamReader/Writer с удобством асинхронного ввода-вывода.

Моей следующей мыслью было использование MemoryStream и запись блоков данных в поток памяти, затем присоединение StreamReader к потоку и использование ReadLine, но опять же у меня возникают проблемы с определением, является ли последнее чтение в буфере полной строкой или нет, плюс еще труднее удалить "устаревшие" данные из потока.

Я также подумал об использовании потока с синхронным чтением. Это имеет то преимущество, что при использовании StreamReader он всегда будет возвращать полную строку из ReadLine(), за исключением случаев разрыва соединения. Однако это имеет проблемы с отменой соединения, и определенные виды сетевых проблем могут привести к зависанию блокирующих сокетов в течение продолжительного периода времени. Я использую асинхронный ввод-вывод, потому что я не хочу связывать поток для жизни программы, блокирующей получение данных.

Связь продолжительна. И данные будут течь со временем. Во время первоначального соединения существует большой поток данных, и как только этот поток завершен, сокет остается открытым в ожидании обновлений в реальном времени. Я не знаю точно, когда начальный поток "закончился", поскольку единственный способ узнать, что больше никаких данных не отправляется сразу. Это означает, что я не могу дождаться окончания начальной загрузки данных перед обработкой, я в значительной степени застрял в обработке "в режиме реального времени", когда она поступает.

Итак, кто-нибудь может предложить хороший способ справиться с этой ситуацией, не слишком сложным? Я действительно хочу, чтобы это было максимально просто и элегантно, но я продолжаю придумывать все более и более сложные решения из-за всех крайних случаев. Я предполагаю, что мне нужен какой-то FIFO, в котором я могу легко добавлять новые данные и одновременно извлекать из них данные, соответствующие определенным критериям (т. Е. Строки, оканчивающиеся символом новой строки).

2 ответа

Решение

Это довольно интересный вопрос. В прошлом я решил использовать отдельный поток с синхронными операциями, как вы предлагаете. (Мне удалось обойти большинство проблем с блокировкой сокетов с помощью блокировок и большого количества обработчиков исключений.) Тем не менее, использование встроенных асинхронных операций, как правило, целесообразно, поскольку допускает настоящий асинхронный ввод-вывод на уровне ОС, поэтому я понимаю, ваша точка зрения.

Ну, я пошел и написал класс для выполнения того, что я считаю, вам нужно (в относительно чистой форме, я бы сказал). Дайте мне знать, что вы думаете.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

Экземпляр этого класса должен быть создан для каждого NetworkStream, и функция Process должна вызываться при каждом получении новых данных (в методе обратного вызова для BeginRead перед вызовом следующего BeginRead, который я себе представляю).

Примечание. Я проверял этот код только с тестовыми данными, а не с фактическими данными, передаваемыми по сети. Тем не менее, я не ожидаю каких-либо различий...

Кроме того, предупреждение о том, что класс, конечно, не является потокобезопасным, но до тех пор, пока BeginRead не будет выполнен снова до тех пор, пока не будут обработаны текущие данные (как я полагаю, вы делаете), проблем быть не должно.

Надеюсь, что это работает для вас. Дайте мне знать, если остались проблемы, и я постараюсь изменить решение, чтобы справиться с ними. (Там может быть некоторая тонкость вопроса, который я пропустил, несмотря на то, что внимательно его прочитал!)

То, что вы объясняете в своем вопросе, очень напоминает мне строки ASCIZ. ( текст ссылки). Это может быть полезным началом.

Мне нужно было написать нечто подобное в колледже для проекта, над которым я работал. К сожалению, у меня был контроль над отправляющим сокетом, поэтому я добавил длину поля сообщения как часть протокола. Тем не менее, я думаю, что подобный подход может принести вам пользу.

Как я подошел к своему решению, я отправил что-то вроде 5HELLO, поэтому сначала я увидел 5 и узнал, что у меня длина сообщения 5, а для этого мне нужно было 5 символов. Однако, если при моем асинхронном чтении я получил только 5HE, я увидел бы, что у меня длина сообщения 5, но я смог прочитать только 3 байта без проводов (предположим, символы ASCII). Из-за этого я знал, что мне не хватает нескольких байтов, и хранил то, что у меня было, во фрагментном буфере. У меня был один буфер фрагментов на сокет, чтобы избежать проблем с синхронизацией. Грубый процесс есть.

  1. Чтение из сокета в байтовый массив, запись количества прочитанных байтов.
  2. Просматривайте побайтово до тех пор, пока не найдете символ новой строки (это становится очень сложно, если вы не получаете символы ascii, но символы, которые могут быть несколькими байтами, вы для этого сами)
  3. Преврати свой буфер фрагмента в строку и добавь свой буфер чтения до новой строки. Удалите эту строку как завершенное сообщение в очередь или ее собственный делегат для обработки. (вы можете оптимизировать эти буферы, фактически записав сокет для записи в тот же байтовый массив, что и фрагмент, но это сложнее объяснить)
  4. Продолжайте цикл, каждый раз, когда мы находим новую строку, создаем строку из байтовой последовательности из записанной начальной / конечной позиции и помещаем в очередь / делегат для обработки.
  5. Как только мы дойдем до конца нашего буфера чтения, скопируйте все, что осталось в буфер фрагмента.
  6. Вызовите BeginRead для сокета, который перейдет к шагу 1., когда данные будут доступны в сокете.

Затем вы используете другой поток для чтения вашей очереди несогласованных сообщений или просто позволяете потоку потоков обрабатывать его с помощью делегатов. И делать любую обработку данных, которую вы должны сделать. Кто-то исправит меня, если я ошибаюсь, но с этим очень мало проблем с синхронизацией потоков, так как вы можете только читать или ожидать чтения из сокета в любое время, так что не беспокойтесь о блокировках (кроме случаев, когда вы заполняя очередь, я использовал делегатов в моей реализации). Есть несколько деталей, которые вам нужно будет проработать самостоятельно, например, какой размер буфера фрагмента оставить, если при чтении вы получаете 0 новых строк, все сообщение должно быть добавлено в буфер фрагментов без перезаписи что-нибудь. Я думаю, что в итоге мне потребовалось около 700 - 800 строк кода, но это включало настройку соединения, согласование для шифрования и некоторые другие вещи.

Эта установка очень хорошо для меня; Я смог выполнить до 80 Мбит / с по локальной сети 100 Мбит / с, используя эту опцию 1,8 ГГц, включая обработку шифрования. А поскольку вы привязаны к сокету, сервер будет масштабироваться, поскольку одновременно можно работать с несколькими сокетами. Если вам нужно, чтобы элементы обрабатывались по порядку, вам нужно использовать очередь, но если порядок не имеет значения, делегаты дадут вам очень масштабируемую производительность из пула потоков.

Надеюсь, что это поможет, не для того, чтобы быть законченным решением, а для того, чтобы начать поиск.

* Просто обратите внимание, моя реализация была отключена исключительно на байтовом уровне и поддерживала шифрование, я использовал символы для своего примера, чтобы сделать его проще для визуализации.

Другие вопросы по тегам