RxJs: новое значение только по истечении определенного периода времени

Я новичок в ReactiveExtensions, и я не получаю работу s.th. что я думаю, должен быть очень распространенным вариантом использования. Я хочу получить новое значение только по истечении определенного периода времени без нового следующего значения. В нижнем примере этот период времени составляет 1 секунду. Кажется, оператор debounce делает именно то, что я хочу. Я не заставляю его работать все же.

const observable$ = new Rx.Observable(observer => {

 observer.next('start');

 setTimeout(() => {
  observer.next(1);
 }, 100);
 setTimeout(() => {
  observer.next(2);
 }, 200);
 setTimeout(() => {
  observer.next(3);
 }, 300);
 setTimeout(() => {
  observer.next(4);
 }, 400);

 setTimeout(() => {
  observer.next('end');
 }, 1500);
});

let sub = observable$
      .debounce(1000) //debounce(1000, null) does not work either
      .take(100)
      .subscribe(data => {
          console.log(data);
         }, 
         err => console.log(err.message), 
         complete => console.log('Observable completed')
       )

То, что я хочу получить, это консольный вывод только:

"start"
"end"

В моей IDE (Webstorm) приведенный выше код даже не компилируется, хотя в документации говорится, что второй аргумент является необязательным. На jsbin.com я получаю следующую ошибку: "this.durationSelector.call не является функцией" (я признаю, я пока не знаю, как применять планировщики в rxjs). В документации они тоже используют только номер. Большинство примеров debounce, которые я нашел в Google, используют только число, то есть этот пример в Stackru. Почему это не работает в моем случае?

Спасибо за вашу помощь!

PS: я использую rxjs 5.0.0-beta.6.

РЕДАКТИРОВАТЬ: С помощью ответов здесь я нашел реальное решение, которое я хотел:

const observable$ = new Rx.Observable(observer => {

observer.next('start');

 setTimeout(() => {
  observer.next(1);
 }, 1100); //<-- If you change 1100 to i.e. 900 you just get "end" in the output, because there is no 1s periode during which no new value arrives. 
 setTimeout(() => {
  observer.next(2);
 }, 1200);
 setTimeout(() => {
  observer.next(3);
 }, 1300);
 setTimeout(() => {
  observer.next(4);
 }, 1400);
 setTimeout(() => {
  observer.next(5);
 }, 1500);

 setTimeout(() => {
  observer.next('end');
 }, 1501);


});

let sub = observable$
      .debounceTime(1000)
      .take(10)
      .subscribe(data => {
         console.log(data);
       }, 
       err => console.log(err.message), 
       complete => console.log('Observable completed')
);

2 ответа

Решение

Вы можете сделать что-то вроде этого

const observable$ = new Rx.Observable(observer => {

    observer.next('start');

    setTimeout(() => {
        observer.next(1);
    }, 100);
    setTimeout(() => {
        observer.next(2);
    }, 200);
    setTimeout(() => {
        observer.next(3);
    }, 300);
    setTimeout(() => {
        observer.next(4);
    }, 400);

    setTimeout(() => {
        observer.next('end');
    }, 1500);


});

let sub = observable$
    .map(function(x, i) {
        return {
            val: x,
            index: i
        };
    })
    .debounce(function(obj) {
        let interval = obj.index === 0 ? 0 : 1500;
        return Rx.Observable.timer(interval);
    })
    .take(100)
    .subscribe(data => {
            console.log(data.val);
        },
        err => console.log(err.message),
        complete => console.log('Observable completed')
    )

Ключ здесь заключается в использовании map функция, чтобы получить индекс элемента, а затем определить интервал ожидания.

Это просто проблема документации, относящейся к более старой версии RxJS. RxJS 4 перегрузки debounce здесь

Тем не менее, RxJS 5 разделяет оператор так, что их фактически два: один использует функцию выбора для определения длины отката, а второй debounceTime который принимает время.

Следовательно, ваш поток становится:

let sub = observable$
      .debounceTime(1000) //debounce(1000, null) does not work either
      .take(100)
      .subscribe(data => {
          console.log(data);
         }, 
         err => console.log(err.message), 
         complete => console.log('Observable completed')
       )
Другие вопросы по тегам