RxJS дросселирует то же значение, но пропускает новые значения

"Здесь у вас есть", кто-то говорит, и вы получаете этот входной поток значений, который вы хотите сделать в соответствии с...

Input:  '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'

Пока ничего странного,
Но теперь кто-то говорит "все в порядке", если то же самое значение приходит снова, "но только если это не скоро!". Я хочу по крайней мере '----' тики между одинаковым значением. "Окей", вы говорите, и вы добавляете газ

const source = new Subject<number>();

// mysterious cave troll is randomly source.next(oneOrTwo)

const example = source.pipe(throttle(val => interval(4000)));

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'

"Это не то, что я хочу! Посмотрите на все значения, которые вы пропустили", имея в виду, что вы ограничиваете все передаваемые значения.

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
        '-------------->1<--------->2<----->1<------|' <-- Missed values

"Вот, давай я покажу тебе шоу", - говорит таинственный человек и дает тебе это

Требуемый выход

Input:  '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'

Мой ответ на это таков: такое ощущение, что комбинированное окно не подойдет.

От кого-то более опытного,
это сложная проблема для решения? (или я пропустил очевидное решение)

5 ответов

Решение

Я нашел решение, которое работает, кто-то имеет какое-либо отношение к этому?

source.pipe(
   windowTime(4000),
   concatMap(obs => obs.pipe(distinct()))
);

Примеры из ранее, в примере StackBlitz

ОБНОВЛЕНИЕ: это на самом деле не работает на 100%. Это только принять текущее окно во внимание. Так вы можете, например, иметь

`[1-12][2---]` which would give `1--22---|`

где [----] будет представлять временное окно. Другими словами, если значение сначала выводится последним в одном окне и сначала выводится в следующем окне, то же самое значение будет проходить сразу после друг друга.

Спасибо @eric99 за то, что заставил меня понять это.

Сначала у меня возникла идея как-то совместить distinctUntilChanged() а также throttleTimte() Однако я не смог найти решение, а потом попробовал что-то другое.

Оператор, с которым я придумал throttleDistinct() работает так, как вы хотели бы: StackBlit Editor Link

Имеет 2 параметра:

  1. duration: number который в миллисекундах и похож на продолжительность в throttleTime(duration: number)
  2. equals: (a: T, b: T) => boolean которая является функцией для сравнения, если предыдущий элемент равен следующему элементу, который имеет реализацию по умолчанию (a, b) => a === b

import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';

const source = fromEvent(document, 'keypress')
  .pipe(map((x: any) => x.keyCode as number))

source
  .pipe(
    throttleDistinct(1000),
  )
  .subscribe((x) => console.log('__subscribe__', x));

export function throttleDistinct<T>(
  duration: number,
  equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
  return (source: Observable<T>) => {
    return source
      .pipe(
        map((x) => {
          const obj = { val: x, time: Date.now(), keep: true };
          return obj;
        }),
        scan((acc, cur) => {
          const diff = cur.time - acc.time;

          const isSame = equals(acc.val, cur.val)
          return diff > duration || (diff < duration && !isSame)
            ? { ...cur, keep: true }
            : { ...acc, keep: false };
        }),
        filter((x) => x.keep),
        map((x) => x.val),
      )
  }
}

Это моя вторая попытка, она фильтрует поток по выводу (вместо того, чтобы принимать DifferentUntil), затем регулирует и объединяет два потока.

Конечно, у нас может не быть известного набора значений (1,2,...n).
Если я смогу разобраться с этой морщинкой, добавлю еще пример.

const output = merge(
  source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
  source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)

Вот мой чек (мс = 4000)

input         1-1----11---2--1112----1---2---2-2-1-2-----
expected      1------1----2--1--2----1---2-----2-1-------

filter(1)     1-1----11------111-----1-----------1-------
throttle(1)   1------1-------1-------1-----------1-------

filter(2)     ------------2-----2--------2---2-2---2-----
throttle(2)   ------------2-----2--------2-----2---------

merged        1------1----2--1--2----1---2-----2-1-------
expected      1------1----2--1--2----1---2-----2-1-------

Расширение до n значений

Я думаю, что это будет работать, когда набор значений в потоке заранее неизвестен (или имеет большой диапазон, поэтому расширение предыдущего ответа нецелесообразно).

Это должно работать, пока источник завершает.

merge(
  source.pipe(
    distinct().pipe(
      mapTo(distinctVal => source.pipe( 
        filter(val = val === distinctVal), 
        throttle(val => interval(ms))
      )
    )  
  )
)

У меня еще нет доказательств, я опубликую это позже.

Вне моей головы, вы хотите буферизовать по временному интервалу, а затем различаться в каждом буфере.

Фактически вы хотите перезапускать / перезагружать отдельный прогон каждые n миллисекунд.

source.pipe(
  bufferTime(ms),
  mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)

Вот сложное решение, основанное на теории операторов, но я не уверен, что оно действительно работает, потому что сначала мне нужно смоделировать излучение источника.

Таким образом, для дросселя и отдельного потока всегда кэшируется последнее значение, zip следит за тем, чтобы они всегда излучались в паре, zip всегда будет излучать, когда какой-либо поток излучает, потому что это shareReplay(1).

Мы всегда берем значение emit из ifiveStream, даже когда поток zip запускается с помощью throttle, потому что ProperStream всегда имеет последнее кэшированное значение.

const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
   map((t,d)=>d)
)
Другие вопросы по тегам