ReactiveX: группа и буфер только последний элемент в каждой группе

Как сгруппировать Observable и из каждого GroupedObservable сохранить в памяти только последний отправленный элемент? Так что каждая группа будет вести себя так же, как BehaviorSubject.

Что-то вроде этого:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

Таким образом, в памяти у нас будет только последний элемент для каждого user:

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

И когда подписчик подписывается, эти два пункта были выпущены сразу (каждый в своем собственном выпуске). Как будто у нас был BehaviorSubject для каждого user,

onCompleted() никогда не должен запускаться, так как люди могут общаться вечно.

Я не знаю заранее, что user значения там могут быть.

2 ответа

Решение

Я предполагаю, что ваш наблюдаемый журнал чата горячий. Следовательно, groupObservables, генерируемые #groupBy, также будут горячими и ничего не будут хранить в памяти.

Чтобы получить желаемое поведение (отменить все, кроме последнего значения до подписки и продолжить оттуда), вы можете использовать ReplaySubject(1).

Пожалуйста, поправьте меня, если я ошибаюсь

смотри jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });

Вы можете написать сокращающую функцию, которая выдает последние испущенные элементы сгруппированных наблюдаемых, передавая их scan наблюдаемый и использовать shareReplay вспомнить последние значения, выданные для новых подписчиков. Это было бы что-то вроде этого:

var fn_scan = function ( aMessages, message ) {
  // aMessages is the latest array of messages
  // this function will update aMessages to reflect the arrival of the new message
  var aUsers = aMessages.map(function ( x ) {return x.user;});
  var index = aUsers.indexOf(message.user);
  if (index > -1) {
    // remove previous message from that user...
    aMessages.splice(index, 1);
  }
  // ...and push the latest message
  aMessages.push(message);
  return aMessages;
};
var groupedLatestMessages$ = messages$
    .scan(fn_scan, [])
    .shareReplay(1);

Таким образом, каждый раз, когда вы подписываетесь, вы получаете массив, размер которого в любой момент будет равным количеству пользователей, отправивших сообщения, и содержимым которых будут сообщения, отправленные пользователями, упорядоченные по времени распространения.

Каждый раз, когда есть подписка, последний массив немедленно передается подписчику. Это массив, хотя я не могу придумать, как передать значения одно за другим, в то же время выполняя ваши требования. Надеюсь, этого достаточно для вашего варианта использования.

ОБНОВЛЕНИЕ: jsbin здесь http://jsfiddle.net/zs7ydw6b/2

Другие вопросы по тегам