RxJS дросселирует то же значение, но пропускает новые значения
"Здесь у вас есть", кто-то говорит, и вы получаете этот входной поток значений, который вы хотите сделать в соответствии с...
Input: '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'
Пока ничего странного,
Но теперь кто-то говорит "все в порядке", если то же самое значение приходит снова, "но только если это не скоро!". Я хочу по крайней мере '----'
тики между одинаковым значением. "Окей", вы говорите, и вы добавляете газ
const source = new Subject<number>();
// mysterious cave troll is randomly source.next(oneOrTwo)
const example = source.pipe(throttle(val => interval(4000)));
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
"Это не то, что я хочу! Посмотрите на все значения, которые вы пропустили", имея в виду, что вы ограничиваете все передаваемые значения.
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
'-------------->1<--------->2<----->1<------|' <-- Missed values
"Вот, давай я покажу тебе шоу", - говорит таинственный человек и дает тебе это
Требуемый выход
Input: '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'
Мой ответ на это таков: такое ощущение, что комбинированное окно не подойдет.
От кого-то более опытного,
это сложная проблема для решения? (или я пропустил очевидное решение)
5 ответов
Я нашел решение, которое работает, кто-то имеет какое-либо отношение к этому?
source.pipe(
windowTime(4000),
concatMap(obs => obs.pipe(distinct()))
);
Примеры из ранее, в примере StackBlitz
ОБНОВЛЕНИЕ: это на самом деле не работает на 100%. Это только принять текущее окно во внимание. Так вы можете, например, иметь
`[1-12][2---]` which would give `1--22---|`
где [----]
будет представлять временное окно. Другими словами, если значение сначала выводится последним в одном окне и сначала выводится в следующем окне, то же самое значение будет проходить сразу после друг друга.
Спасибо @eric99 за то, что заставил меня понять это.
Сначала у меня возникла идея как-то совместить distinctUntilChanged()
а также throttleTimte()
Однако я не смог найти решение, а потом попробовал что-то другое.
Оператор, с которым я придумал throttleDistinct()
работает так, как вы хотели бы: StackBlit Editor Link
Имеет 2 параметра:
duration: number
который в миллисекундах и похож на продолжительность вthrottleTime(duration: number)
equals: (a: T, b: T) => boolean
которая является функцией для сравнения, если предыдущий элемент равен следующему элементу, который имеет реализацию по умолчанию(a, b) => a === b
import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';
const source = fromEvent(document, 'keypress')
.pipe(map((x: any) => x.keyCode as number))
source
.pipe(
throttleDistinct(1000),
)
.subscribe((x) => console.log('__subscribe__', x));
export function throttleDistinct<T>(
duration: number,
equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
return (source: Observable<T>) => {
return source
.pipe(
map((x) => {
const obj = { val: x, time: Date.now(), keep: true };
return obj;
}),
scan((acc, cur) => {
const diff = cur.time - acc.time;
const isSame = equals(acc.val, cur.val)
return diff > duration || (diff < duration && !isSame)
? { ...cur, keep: true }
: { ...acc, keep: false };
}),
filter((x) => x.keep),
map((x) => x.val),
)
}
}
Это моя вторая попытка, она фильтрует поток по выводу (вместо того, чтобы принимать DifferentUntil), затем регулирует и объединяет два потока.
Конечно, у нас может не быть известного набора значений (1,2,...n).
Если я смогу разобраться с этой морщинкой, добавлю еще пример.
const output = merge(
source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)
Вот мой чек (мс = 4000)
input 1-1----11---2--1112----1---2---2-2-1-2-----
expected 1------1----2--1--2----1---2-----2-1-------
filter(1) 1-1----11------111-----1-----------1-------
throttle(1) 1------1-------1-------1-----------1-------
filter(2) ------------2-----2--------2---2-2---2-----
throttle(2) ------------2-----2--------2-----2---------
merged 1------1----2--1--2----1---2-----2-1-------
expected 1------1----2--1--2----1---2-----2-1-------
Расширение до n значений
Я думаю, что это будет работать, когда набор значений в потоке заранее неизвестен (или имеет большой диапазон, поэтому расширение предыдущего ответа нецелесообразно).
Это должно работать, пока источник завершает.
merge(
source.pipe(
distinct().pipe(
mapTo(distinctVal => source.pipe(
filter(val = val === distinctVal),
throttle(val => interval(ms))
)
)
)
)
У меня еще нет доказательств, я опубликую это позже.
Вне моей головы, вы хотите буферизовать по временному интервалу, а затем различаться в каждом буфере.
Фактически вы хотите перезапускать / перезагружать отдельный прогон каждые n миллисекунд.
source.pipe(
bufferTime(ms),
mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)
Вот сложное решение, основанное на теории операторов, но я не уверен, что оно действительно работает, потому что сначала мне нужно смоделировать излучение источника.
Таким образом, для дросселя и отдельного потока всегда кэшируется последнее значение, zip следит за тем, чтобы они всегда излучались в паре, zip всегда будет излучать, когда какой-либо поток излучает, потому что это shareReplay(1).
Мы всегда берем значение emit из ifiveStream, даже когда поток zip запускается с помощью throttle, потому что ProperStream всегда имеет последнее кэшированное значение.
const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
map((t,d)=>d)
)