Будет ли Rx.Observable.groupBy очищать пустые потоки?

В приложении Node я пытаюсь обработать поток событий, используя RxJS. Поток событий представляет собой список изменений многих документов. Я использую groupBy для разделения потока на новые потоки по documentId. Но мне интересно, как только документ закрывается на клиенте, и в поток для этого documentId не добавляются новые события, будет ли groupBy распоряжаться потоком этого документа, когда он пуст? Если нет, то как бы я сделал это вручную? Я хочу избежать утечки памяти, вызванной тем, что новые потоки документов создаются, но никогда не уничтожаются.

2 ответа

Что бы я предложил сделать:

вместо того, чтобы просто наблюдать объект documentChanges, нужно наблюдать объект documentEvents.

Клиенты будут отправлять события documentOpened при открытии документа, события documentChanged при изменении документа и события documentClosed при закрытии документа.

Отправляя все 3 типа событий через одну и ту же наблюдаемую, вы устанавливаете и гарантируете заказ. Если клиент отправляет события documentOpened, documentChanged, documentClosed в этом порядке, то ваш сервер увидит их в этом порядке. Обратите внимание, что не будет никаких гарантий относительно порядка событий, отправленных двумя разными клиентами. Это позволит вам убедиться, что события, отправленные конкретным клиентом, будут в порядке.

А потом, вот как бы вы использовали groupByUntil:

documentEvents
    .groupByUntil(
        function (e) { return e.documentId; }, // key
        null, // element
        function (group) { // duration selector
            var documentId = group.key;
            return group.filter(function (e) { return e.eventType === 'documentClosed'; });
      })
    .flatMap(function (eventsForDocument) {
        var documentId = eventsForDocument.key;
        return eventsForDocument.whatever(...);
    })
    .subscribe(...);

Еще один вариант, который намного проще: вы можете просто прекратить действие группы после периода простоя. В зависимости от того, что вы делаете с событиями, этого может быть более чем достаточно. В этом примере истекает срок действия группы, если документ не редактировался в течение 5 минут. Если поступает больше правок, появляется новая группа.

var idleTime = 5 * 60 * 1000;
events
    .groupByUntil(
        function(e) { return e.documentId; },
        null,
        function(g) { return g.debounce(idleTime); })
    .flatMap...

Поскольку вы включили тег.NET, я расскажу и о Rx.NET.

Ваш вопрос сформулирован немного неправильно. Потоки пусты, если и только если они никогда не имеют события. Таким образом, они не могут стать пустыми. Однако поток, который не передает данные, обычно не потребляет много ресурсов.

В.NET группы не прерываются, пока не прекратится источник. Мы используем 'GroupByUntil`, который позволяет вам указать поток durationSelector для каждой группы. Observable.Timer часто хорошо работает для этого.

Это означает, что вы можете получить несколько непараллельных потоков с одним и тем же ключом, появляющимся со временем, но если (как это часто бывает) ваши групповые потоки в какой-то момент сглаживаются, это не будет иметь значения.

В rxjs у нас также есть groupByUntil.

В Rx-Java метод groupByUntil, который вел себя аналогично, был свернут в groupBy - см. https://github.com/ReactiveX/RxJava/pull/1727 и https://github.com/benjchristensen/RxJava/commit/b9302956832e3e77579f63fd9db25aa60eb4192a Больше подробностей.

http://reactivex.io/documentation/operators/groupby.html говорит:

Если вы отмените подписку на одну из GroupedObservables, эта GroupedObservable будет прервана. Если исходная Observable позже испускает элемент, ключ которого совпадает с GroupedObservable, который был прерван таким образом, groupBy создаст и выпустит новый GroupedObservable, соответствующий ключу.

Таким образом, в Rx-Java вы должны отписаться от сгруппированного наблюдаемого потока, чтобы завершить его. takeUntil с timer поток может работать для этого.

Приложение:

В ответ на ваш комментарий поток не будет прерван, пока нижестоящий оператор не откажется от него. Селектор длительности groupByUntil вызовет завершение. Если документ не будет снова открыт после закрытия, вы можете просто отправить событие "documentclosed" в поток и использовать обычный groupBy с тестом takeWhile для "documentClosed".

Причина, по которой важно, чтобы документ не открывался снова, заключается в том, что с groupBy (в rx-js и rx.net) новая группа не будет создана, если вновь появится уже увиденный ключ.

Если это проблема, вам нужно будет использовать groupByUntil и использовать опубликованный поток для отслеживания события documentClosed - использование опубликованного потока гарантирует, что вы не получите побочных эффектов подписки.

Другие вопросы по тегам