Разделить IObservable<byte []> на символы, а затем на строку
Rx великолепен, но иногда трудно найти элегантный способ что-то сделать. Идея довольно проста. Я получаю события с байтом [], этот массив может содержать часть строки, несколько строк или одну строку. То, что я хочу, это найти способ иметь IObservable Line так IObservable<String>
где каждый элемент последовательности будет строкой.
Через несколько часов самое близкое решение, которое я нашел, довольно уродливо, и, конечно, не работает, потому что триггер сканирования OnNext на каждом символе:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
Замечания: _reactiveSubcription
должен быть IObservable<String>
,
Что мне не хватает для того, чтобы эта работа работала без учета кодировки символов?
1 ответ
Это работает для меня.
Сначала преобразуйте byte[] в строку и разбейте строку на \r
(Regex Split сохраняет разделители).
Теперь есть поток строк, некоторые из которых заканчиваются \r
,
Затем Конкат, чтобы держать их в порядке. Кроме того, так как strings
Для следующего шага необходимо указать Hot, опубликуйте их.
var strings = bytes.
Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
Where(s=> s.Length != 0).
ToObservable()).
Concat().
Publish().
RefCount();
Сделать окно из строк, которое заканчивается, когда строка заканчивается \r
, strings
должен быть горячим, поскольку он используется как для содержимого окна, так и для триггера конца окна.
var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));
Объедините каждое окно в одну строку.
var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));
lines
является IObservable<String>
и каждая строка содержит одну строку.
Чтобы проверить это, я использовал следующий генератор для IObservable<byte[]>
var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
Return((byte)('A' + i)).
Repeat(24).
Concat(Observable.
Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());