Разделить IObservable<byte []> на символы, а затем на строку

Rx великолепен, но иногда трудно найти элегантный способ что-то сделать. Идея довольно проста. Я получаю события с байтом [], этот массив может содержать часть строки, несколько строк или одну строку. То, что я хочу, это найти способ иметь IObservable Line так IObservable<String>где каждый элемент последовательности будет строкой.

Через несколько часов самое близкое решение, которое я нашел, довольно уродливо, и, конечно, не работает, потому что триггер сканирования OnNext на каждом символе:

//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
    //Scan doesn't work it trigger OnNext on every char
    //Aggregate doesn't work neither as it doesn't return intermediate result
    .Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
    .Subscribe(this);


Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
            //Data is a byte[]
            .Select(_ => _.EventArgs.Data)
            .Subscribe(array => array.ToObservable()
            //Convert into char
            .ForEach(c => outputStream.OnNext((char)c)));

Замечания: _reactiveSubcription должен быть IObservable<String>,

Что мне не хватает для того, чтобы эта работа работала без учета кодировки символов?

1 ответ

Решение

Это работает для меня.

Сначала преобразуйте byte[] в строку и разбейте строку на \r (Regex Split сохраняет разделители).

Теперь есть поток строк, некоторые из которых заканчиваются \r,

Затем Конкат, чтобы держать их в порядке. Кроме того, так как strings Для следующего шага необходимо указать Hot, опубликуйте их.

var strings = bytes.
  Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
    Where(s=> s.Length != 0).
    ToObservable()).
  Concat().
  Publish().
  RefCount();

Сделать окно из строк, которое заканчивается, когда строка заканчивается \r, strings должен быть горячим, поскольку он используется как для содержимого окна, так и для триггера конца окна.

var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));

Объедините каждое окно в одну строку.

var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));

lines является IObservable<String> и каждая строка содержит одну строку.

Чтобы проверить это, я использовал следующий генератор для IObservable<byte[]>

var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
    Return((byte)('A' + i)).
    Repeat(24).
    Concat(Observable.
        Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());
Другие вопросы по тегам