Как ограничить внутренние подписки mergeMap N последней или очередью скользящего окна

У меня есть исходный поток, объединенный из двух потоков. Когда исходный поток испускает событие, я хотел бы вызвать функцию подпискиMeteor.subscribe и держи его открытым, поэтому я использую mergeMap. Когда подписка готова, я перехожу к другомуmergeMapдля заполнения данных. Он работает хорошо, пока я не делаю 100 кликов, а потребление памяти стремительно растет. Вопрос в том, как можно ограничить mergeMap не первыми N подписками наconcurrent: Number, а к N недавним, как скользящее окно?

function paginationCache$(): Observable<any> {

    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes
                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('my/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);

                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                });

            })
        );
}

Я хотел бы более подробно объяснить, что происходит в этом коде.

В моем примере исходный поток (mergeперед трубкой) он никогда не завершается, пока я нажимаю кнопку в своем веб-интерфейсе, поэтому он генерирует изменения при нажатии следующей или предыдущей кнопки в своем интерфейсе. Первый mergeMapполучает изменения из исходного потока и отправляет их в серверный API (который также имеет конфликтную публикацию / подписку по именованию). Поэтому, когда данные о клиенте доступны, я звонюobserver.next(subscription) перейти ко второму mergeMap, но я не могу уничтожить или остановить подписку Meteor. Две причины: 1. Я хочу получать изменения выбранных данных в реальном времени. 2. Если я прекращу подписку на Meteor, данные на стороне клиента будут удалены. Итак, теперь второе mergeMap он постоянно обновляет выбранные данные, если они были обновлены на сервере.

Итак, после каждого нажатия кнопки пользовательского интерфейса (следующий, предыдущий) у меня появляется новая цепочка подписок. Ничего страшного, если исходная таблица данных невелика (1000 записей), и я просто щелкнул пару раз. Но у меня может быть больше 30000, и я могу нажимать кнопки много раз.

Итак, идея состоит в том, чтобы сделать mergeMap похожей на очередь ограниченного размера, которая содержит только последние N подписок, но очередь меняется все время, когда я нажимаю кнопку.

ПОСЛЕДНИЙ РЕДАКТИРОВАНИЕ: рабочий код:

function paginationCache$(): Observable<any> {
    const N = 3;
    const subscriptionsSubject = new Subject();
    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes

                subscriptionsSubject.next();

                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);
                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                }).pipe(
                    takeUntil(subscriptionsSubject
                        .pipe(
                            take(N),
                            filter((_, idx) => idx === N - 1)
                        )
                    )
                );
            })
        );
}

2 ответа

Решение

Не считая вашего фрагмента, я бы сделал следующее:

не на первые N подписок по concurrent: Number, а на N последних, как скользящее окно

Если я правильно понял, вам нужно что-то вроде этого (при условии N = 3):

N = 3

Crt             |  1 |  2 |  3 |
Subscriptions   | S1 | S2 | S3 |


When Crt = 4

Crt           | 2  | 3  |  4 |
Subscriptions | S2 | S3 | S4 |

Если это так, вот как я бы решил это:

const subscriptionsSubject = new Subject();

src$.pipe(
  mergeMap(
    data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
      .pipe(
        takeUntil(
          subscriptionsSubject.pipe(
            take(N), // After `N` subscriptions, it will complete
            filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
          )
        )
      )
  )
)

У меня есть две идеи:

  1. Вы не завершаете второй внутренний Observable. Я думаю, это не должно быть источником вашей проблемы, но лучше дополнить наблюдателей, если вы можете:

    return () => {
      subscription.stop();
      observer.complete();
    };
    
  2. Вы можете использовать bufferCount сделать скользящее окно Observables, а затем подписаться на них с помощью switchMap(). Что-то в этом роде:

    import { of, range } from 'rxjs'; 
    import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators';
    
    range(10)
      .pipe(
        // turn each value to an Observable
        // `refCount` is used so that when `switchMap` switches to a new window
        // it won't trigger resubscribe to its sources and make more requests.
        map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))),
        bufferCount(3, 1),
        tap(console.log), // for debugging purposes only
        switchMap(sourcesArray => merge(...sourcesArray)),
      )
      .subscribe(console.log);
    

    Живая демонстрация: https://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60

    Я не совсем уверен, что это имитирует ваш вариант использования, но я попытался включить также shareReplay чтобы он не запускал несколько Meteor.subscribeтребует того же Observable. Мне нужно было бы иметь рабочую демонстрацию вашего кода, чтобы протестировать его сам.

Другие вопросы по тегам