Как ограничить внутренние подписки mergeMap N последней или очередью скользящего окна
У меня есть исходный поток, объединенный из двух потоков. Когда исходный поток испускает событие, я хотел бы вызвать функцию подпискиMeteor.subscribe
и держи его открытым, поэтому я использую mergeMap
. Когда подписка готова, я перехожу к другомуmergeMap
для заполнения данных. Он работает хорошо, пока я не делаю 100 кликов, а потребление памяти стремительно растет. Вопрос в том, как можно ограничить mergeMap не первыми N подписками наconcurrent: Number
, а к N недавним, как скользящее окно?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
Я хотел бы более подробно объяснить, что происходит в этом коде.
В моем примере исходный поток (merge
перед трубкой) он никогда не завершается, пока я нажимаю кнопку в своем веб-интерфейсе, поэтому он генерирует изменения при нажатии следующей или предыдущей кнопки в своем интерфейсе. Первый mergeMap
получает изменения из исходного потока и отправляет их в серверный API (который также имеет конфликтную публикацию / подписку по именованию). Поэтому, когда данные о клиенте доступны, я звонюobserver.next(subscription)
перейти ко второму mergeMap
, но я не могу уничтожить или остановить подписку Meteor. Две причины: 1. Я хочу получать изменения выбранных данных в реальном времени. 2. Если я прекращу подписку на Meteor, данные на стороне клиента будут удалены. Итак, теперь второе mergeMap
он постоянно обновляет выбранные данные, если они были обновлены на сервере.
Итак, после каждого нажатия кнопки пользовательского интерфейса (следующий, предыдущий) у меня появляется новая цепочка подписок. Ничего страшного, если исходная таблица данных невелика (1000 записей), и я просто щелкнул пару раз. Но у меня может быть больше 30000, и я могу нажимать кнопки много раз.
Итак, идея состоит в том, чтобы сделать mergeMap похожей на очередь ограниченного размера, которая содержит только последние N подписок, но очередь меняется все время, когда я нажимаю кнопку.
ПОСЛЕДНИЙ РЕДАКТИРОВАНИЕ: рабочий код:
function paginationCache$(): Observable<any> {
const N = 3;
const subscriptionsSubject = new Subject();
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
subscriptionsSubject.next();
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
}).pipe(
takeUntil(subscriptionsSubject
.pipe(
take(N),
filter((_, idx) => idx === N - 1)
)
)
);
})
);
}
2 ответа
Не считая вашего фрагмента, я бы сделал следующее:
не на первые N подписок по concurrent: Number, а на N последних, как скользящее окно
Если я правильно понял, вам нужно что-то вроде этого (при условии N = 3
):
N = 3
Crt | 1 | 2 | 3 |
Subscriptions | S1 | S2 | S3 |
When Crt = 4
Crt | 2 | 3 | 4 |
Subscriptions | S2 | S3 | S4 |
Если это так, вот как я бы решил это:
const subscriptionsSubject = new Subject();
src$.pipe(
mergeMap(
data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
.pipe(
takeUntil(
subscriptionsSubject.pipe(
take(N), // After `N` subscriptions, it will complete
filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
)
)
)
)
)
У меня есть две идеи:
Вы не завершаете второй внутренний Observable. Я думаю, это не должно быть источником вашей проблемы, но лучше дополнить наблюдателей, если вы можете:
return () => { subscription.stop(); observer.complete(); };
Вы можете использовать
bufferCount
сделать скользящее окно Observables, а затем подписаться на них с помощьюswitchMap()
. Что-то в этом роде:import { of, range } from 'rxjs'; import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators'; range(10) .pipe( // turn each value to an Observable // `refCount` is used so that when `switchMap` switches to a new window // it won't trigger resubscribe to its sources and make more requests. map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))), bufferCount(3, 1), tap(console.log), // for debugging purposes only switchMap(sourcesArray => merge(...sourcesArray)), ) .subscribe(console.log);
Живая демонстрация: https://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60
Я не совсем уверен, что это имитирует ваш вариант использования, но я попытался включить также
shareReplay
чтобы он не запускал несколькоMeteor.subscribe
требует того же Observable. Мне нужно было бы иметь рабочую демонстрацию вашего кода, чтобы протестировать его сам.