rxdart с несколькими слушателями, вызывающими многократный запуск обратного вызова сканирования для одного потока

У меня есть вопрос о rxdart и конкретно об ожидаемом поведении здесь. Я подписываюсь с 2 слушателями на Observable. С одним слушателем обратный вызов сканирования запускается один раз. С другим слушателем он запускается дважды. Мне кажется неправильным, что на поведение потока в восходящем направлении может повлиять добавление дополнительного прослушивателя.

Я, вероятно, использую что-то неправильно, и если это так, мне бы хотелось точно знать, что я неправильно понимаю о том, как это должно работать.

class DataProvider
{
  Subject<Object> input = new PublishSubject<Object>();
  Observable<Object> output;

  DataProvider()
  {
    output =  input.scan((acc, curr, i){
      print("Ran Scan");
      return (acc as int) + curr;
    },0);
  }
}

void main() async {

  var data = DataProvider();

  var f1 = Future.delayed(Duration(seconds: 0),
          () => data.output.listen((value) => print('first: $value')));
  var f2 = Future.delayed(Duration(seconds: 1),
          () => data.output.listen((value) => print('second: $value')));

  print ("-- even increments for single listener");
  await f1;
  print ("-- added 1");
  data.input.add(1);
  print ("-- added 1");
  data.input.add(1);

  await f2;
  print ("-- added 1");
  data.input.add(1);
}

Одним из решений было оборачивать вывод в PublishConnectableObservable, но мне все равно не имеет смысла, почему оригинал ведет себя так, как он.

0 ответов

Другие вопросы по тегам