Дарт: Как создать поток, который объединяет события из другого потока?
Как лучше всего создать поток, который должен агрегировать несколько событий из другого потока?
Моя цель - создать поток, который объединяет события из другого потока, пока в нем не будет достаточно событий для создания сообщения. В моем случае я читаю данные из потока Socket, поэтому сообщение может быть распределено по разным событиям, а событие может содержать данные для различных сообщений, поэтому я не могу просто применить операцию карты к каждому элементу.
Кажется, что правильным было бы использовать Stream Transformer, но у меня возникают проблемы с поиском информации о том, как правильно его реализовать и без слишком большого количества шаблонного кода.
Я придумал решение, прочитав о том, как создавать потоки, но я не уверен, приемлемо ли это или лучший способ сделать это.
Вот мой пример решения:
Stream<String> joinWordsIfStartWithC(Stream<String> a) async* {
var prevWord= '';
await for (var i in a) {
prevWord += i;
if(i.startsWith('C')){
yield prevWord;
prevWord = '';
}
}
}
Stream<String> periodicStream(Duration interval) async* {
while (true) {
await Future.delayed(interval);
yield 'C';
yield 'A';
yield 'B';
yield 'C';
yield 'C';
yield 'B';
yield 'C';
}
}
void main(List<String> arguments) async {
var intStream = periodicStream(Duration(seconds: 2));
var sStream = joinWordsIfStartWithC(intStream);
sStream.listen((s) => print(s));
}
1 ответ
Я скажу, что ваше решение кажется хорошим, но если вы хотите создать преобразователь потока, это довольно просто, расширив StreamTransformerBase
:
import 'dart:async';
class JoinWordsIfStartWithCTransformer extends StreamTransformerBase<String, String> {
Stream<String> bind(Stream<String> a) async* {
var prevWord = '';
await for (var i in a) {
prevWord += i;
if (i.startsWith('C')) {
yield prevWord;
prevWord = '';
}
}
}
}
Stream<String> periodicStream(Duration interval) async* {
while (true) {
await Future.delayed(interval);
yield 'C';
yield 'A';
yield 'B';
yield 'C';
yield 'C';
yield 'B';
yield 'C';
}
}
void main(List<String> arguments) async {
var intStream = periodicStream(Duration(seconds: 2));
var sStream = intStream.transform(JoinWordsIfStartWithCTransformer());
sStream.listen((s) => print(s));
}