ReactiveX: группа и буфер только последний элемент в каждой группе
Как сгруппировать Observable и из каждого GroupedObservable сохранить в памяти только последний отправленный элемент? Так что каждая группа будет вести себя так же, как BehaviorSubject.
Что-то вроде этого:
{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
Таким образом, в памяти у нас будет только последний элемент для каждого user
:
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
И когда подписчик подписывается, эти два пункта были выпущены сразу (каждый в своем собственном выпуске). Как будто у нас был BehaviorSubject для каждого user
,
onCompleted() никогда не должен запускаться, так как люди могут общаться вечно.
Я не знаю заранее, что user
значения там могут быть.
2 ответа
Я предполагаю, что ваш наблюдаемый журнал чата горячий. Следовательно, groupObservables, генерируемые #groupBy, также будут горячими и ничего не будут хранить в памяти.
Чтобы получить желаемое поведение (отменить все, кроме последнего значения до подписки и продолжить оттуда), вы можете использовать ReplaySubject(1).
Пожалуйста, поправьте меня, если я ошибаюсь
смотри jsbin
var groups = chatlog
.groupBy(message => message.user)
.map(groupObservable => {
var subject = new Rx.ReplaySubject(1);
groupObservable.subscribe(value => subject.onNext(value));
return subject;
});
Вы можете написать сокращающую функцию, которая выдает последние испущенные элементы сгруппированных наблюдаемых, передавая их scan
наблюдаемый и использовать shareReplay
вспомнить последние значения, выданные для новых подписчиков. Это было бы что-то вроде этого:
var fn_scan = function ( aMessages, message ) {
// aMessages is the latest array of messages
// this function will update aMessages to reflect the arrival of the new message
var aUsers = aMessages.map(function ( x ) {return x.user;});
var index = aUsers.indexOf(message.user);
if (index > -1) {
// remove previous message from that user...
aMessages.splice(index, 1);
}
// ...and push the latest message
aMessages.push(message);
return aMessages;
};
var groupedLatestMessages$ = messages$
.scan(fn_scan, [])
.shareReplay(1);
Таким образом, каждый раз, когда вы подписываетесь, вы получаете массив, размер которого в любой момент будет равным количеству пользователей, отправивших сообщения, и содержимым которых будут сообщения, отправленные пользователями, упорядоченные по времени распространения.
Каждый раз, когда есть подписка, последний массив немедленно передается подписчику. Это массив, хотя я не могу придумать, как передать значения одно за другим, в то же время выполняя ваши требования. Надеюсь, этого достаточно для вашего варианта использования.
ОБНОВЛЕНИЕ: jsbin здесь http://jsfiddle.net/zs7ydw6b/2