Как сохранить накопленный результат сканирования с помощью rxjs
У меня есть две объединенные наблюдаемые со сканированием после объединения. Первый - простой диапазон, а другой - Предмет. Всякий раз, когда субъект выдает новое значение с onNext
Я объединяю это значение в скане и возвращаю новый массив в качестве аккумулятора. Если я избавляюсь от своей подписки, а затем снова подписываюсь, она воспроизводит значения из диапазона, но я теряю значения из темы. В приведенном ниже коде я хочу, чтобы моя вторая подписка имела окончательное значение [1, 2, 3, 4, 5]
Каков был бы лучший способ сделать это? Прямо сейчас у меня есть другой предмет, где я сохраняю это окончательное значение и подписываюсь на него, но это неправильно.
Вот простая версия, которая демонстрирует, что происходит:
var Rx = require('rx');
var source = Rx.Observable.range(1, 3);
var adder = new Rx.Subject();
var merged = source.merge(adder)
.scan([], function(accum, x) {
return accum.concat(x);
});
var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);
subscription1.dispose();
console.log('After Disposal');
var subscription2 = merged.subscribe(function(x) {console.log(x)});
Это выводит:
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
2 ответа
Тема - это горячая Наблюдаемая, поэтому вторая подписка не будет видеть события, исходящие от Субъекта. Диапазон Observable является холодным, поэтому каждый "экземпляр выполнения" полностью принадлежит каждой подписке. С другой стороны, "экземпляр выполнения" субъекта является одноэлементным и независимым, поэтому вторая подписка не видит события.
Есть несколько способов решения этой проблемы.
- Используйте ReplaySubject. Вы должны будете указать размер буфера. Если у вас нет подходящего ограничения для этого буфера, использование неограниченного буфера может вызвать проблемы с памятью.
- Избегайте использования темы. Другими словами, избегайте использования горячей Observable, замените ее холодной Observable, и в соответствии с описанием проблемы, которое я дал в начале, у ваших подписок не будет проблемы, и они будут видеть те же события. Обычно субъект может быть заменен наблюдаемым, но это зависит от вашей общей архитектуры. Возможно, нужно много переписать. И в худшем случае, например, при круговой зависимости Observables, вы не можете избежать использования Subject.
- Перегруппируйте код так, чтобы подписки начинались до того, как субъект начинает излучать, так что все подписки получают возможность увидеть "живые" выбросы от горячих наблюдаемых.
Однако, если моя интерпретация этой проблемы верна, вам нужно только последнее событие, испущенное merged
, так что вы можете использовать вариант alternative (1), где вы воспроизводите только последнее событие. Это было бы вопросом добавления .shareReplay(1)
в merged
, что сделает его горячим повтором Observable.
Из вашего описания звучит так, будто вы ищете BehaviorSubject
или же ReplaySubject
(или их соответствующие операторы, publishValue()
а также replay()
), скорее, чем scan
,
Вы можете использовать следующее как способ подключения вашего состояния при каждом повторном посещении страницы (поскольку вы упомянули об этом, я буду использовать пример приложения todo):
var todos = buttonClicked
.map(function(e) { return newTodoBox.text(); })
//This will preserve the state of the todo list across subscriptions
.replay();
var todoSubscription;
pageActivated.subscribe(function() {
todoSubscription = new Rx.CompositeDisposable(
//Add the items to the list
todos.subscribe(addItem),
//This enables the replay to actually start
//receiving values
todos.connect());
/*Do other activation work*/
});
pageDeactivated.subscribe(function() {
todoSubscription && todoSubscription.dispose();
/*Do other clean up work*/
});
И вот пример рабочего кода, который, я думаю, отражает то, что вы ищете, обратите внимание, в этом случае мне не нужно сканировать, так как список полностью регидратируется replay()
дальше по цепочке. Вы заметите, что если вы добавите некоторые элементы todo, затем отключитесь и снова подключитесь, чтобы эти элементы снова появились.
var buttonClicked = Rx.Observable.fromEvent($('#add'), 'click');
var disconnect = Rx.Observable.fromEvent($('#disconnect'), 'click');
var connect = Rx.Observable.fromEvent($('#connect'), 'click');
var newTodoBox = $('#todos')[0];
var mylist = $('#mylist');
function addItem(e) {
mylist.append('<li>' + e + '</li>');
}
var todos = buttonClicked
.map(function(e) {
return newTodoBox.value;
})
.replay();
var todoSubscription;
disconnect.subscribe(function() {
mylist.empty();
todoSubscription && todoSubscription.dispose();
});
connect.startWith(true).subscribe(function() {
todoSubscription = new Rx.CompositeDisposable(
todos.subscribe(addItem),
todos.connect());
});
<head>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
</head>
<body>
<input id="todos" />
<button id="add">Add Todo</button>
<div>
<ul id='mylist'></ul>
</div>
<div>
<button id='disconnect'>Disconnect</button>
<button id='connect'>Connect</button>
</div>
</body>