RxJs: новое значение только по истечении определенного периода времени
Я новичок в ReactiveExtensions, и я не получаю работу s.th. что я думаю, должен быть очень распространенным вариантом использования. Я хочу получить новое значение только по истечении определенного периода времени без нового следующего значения. В нижнем примере этот период времени составляет 1 секунду. Кажется, оператор debounce делает именно то, что я хочу. Я не заставляю его работать все же.
const observable$ = new Rx.Observable(observer => {
observer.next('start');
setTimeout(() => {
observer.next(1);
}, 100);
setTimeout(() => {
observer.next(2);
}, 200);
setTimeout(() => {
observer.next(3);
}, 300);
setTimeout(() => {
observer.next(4);
}, 400);
setTimeout(() => {
observer.next('end');
}, 1500);
});
let sub = observable$
.debounce(1000) //debounce(1000, null) does not work either
.take(100)
.subscribe(data => {
console.log(data);
},
err => console.log(err.message),
complete => console.log('Observable completed')
)
То, что я хочу получить, это консольный вывод только:
"start"
"end"
В моей IDE (Webstorm) приведенный выше код даже не компилируется, хотя в документации говорится, что второй аргумент является необязательным. На jsbin.com я получаю следующую ошибку: "this.durationSelector.call не является функцией" (я признаю, я пока не знаю, как применять планировщики в rxjs). В документации они тоже используют только номер. Большинство примеров debounce, которые я нашел в Google, используют только число, то есть этот пример в Stackru. Почему это не работает в моем случае?
Спасибо за вашу помощь!
PS: я использую rxjs 5.0.0-beta.6.
РЕДАКТИРОВАТЬ: С помощью ответов здесь я нашел реальное решение, которое я хотел:
const observable$ = new Rx.Observable(observer => {
observer.next('start');
setTimeout(() => {
observer.next(1);
}, 1100); //<-- If you change 1100 to i.e. 900 you just get "end" in the output, because there is no 1s periode during which no new value arrives.
setTimeout(() => {
observer.next(2);
}, 1200);
setTimeout(() => {
observer.next(3);
}, 1300);
setTimeout(() => {
observer.next(4);
}, 1400);
setTimeout(() => {
observer.next(5);
}, 1500);
setTimeout(() => {
observer.next('end');
}, 1501);
});
let sub = observable$
.debounceTime(1000)
.take(10)
.subscribe(data => {
console.log(data);
},
err => console.log(err.message),
complete => console.log('Observable completed')
);
2 ответа
Вы можете сделать что-то вроде этого
const observable$ = new Rx.Observable(observer => {
observer.next('start');
setTimeout(() => {
observer.next(1);
}, 100);
setTimeout(() => {
observer.next(2);
}, 200);
setTimeout(() => {
observer.next(3);
}, 300);
setTimeout(() => {
observer.next(4);
}, 400);
setTimeout(() => {
observer.next('end');
}, 1500);
});
let sub = observable$
.map(function(x, i) {
return {
val: x,
index: i
};
})
.debounce(function(obj) {
let interval = obj.index === 0 ? 0 : 1500;
return Rx.Observable.timer(interval);
})
.take(100)
.subscribe(data => {
console.log(data.val);
},
err => console.log(err.message),
complete => console.log('Observable completed')
)
Ключ здесь заключается в использовании map
функция, чтобы получить индекс элемента, а затем определить интервал ожидания.
Это просто проблема документации, относящейся к более старой версии RxJS. RxJS 4 перегрузки debounce
здесь
Тем не менее, RxJS 5 разделяет оператор так, что их фактически два: один использует функцию выбора для определения длины отката, а второй debounceTime
который принимает время.
Следовательно, ваш поток становится:
let sub = observable$
.debounceTime(1000) //debounce(1000, null) does not work either
.take(100)
.subscribe(data => {
console.log(data);
},
err => console.log(err.message),
complete => console.log('Observable completed')
)