RXJava2 управлять подписками

Мне нужно уточнить, каков наилучший подход к управлению конкретным сценарием с использованием RxJava2 (на этом основана вся структура приложения):

В моем приложении многие люди могут вносить изменения в один и тот же документ, поэтому я должен предоставить каждое изменение всем, кто просматривает документ. Но этот объект очень сложный и тяжелый, поэтому мне нужно удалить его из памяти, когда последний человек закроет его. И еще: документ может быть дочерним по отношению к другому документу, поэтому каждое изменение в родительском документе должно быть отправлено всем дочерним элементам.

Что я сделал до сих пор: я создал менеджера, чтобы каждый запрос на документ приходил к нему. Всякий раз, когда кому-то нужно работать над документом, я смотрю на карте, если документ уже открыт. Если это не так, я создаю экземпляр BaseDocument, который получает данные из документа и объект PublishSubject для распространения событий и добавления к этой карте. Затем я подписываю пользователя Observer на PublishSubject для получения изменений. Всякий раз, когда пользователю нужно что-то изменить, он отправляет изменение в BaseDocument, он вносит это изменение и отправляет новую версию через onNext() всем. Все идет нормально.

Моя проблема в том, что я не могу контролировать, когда кто-то удаляет обозреватель документа, поэтому я не могу контролировать, когда документ больше не нужен, поэтому я могу сохранить любые несохраненные изменения и уничтожить объект. Я не могу найти какой-либо список подписки или что-то подобное, кроме "hasObservers()", и я не хочу добавлять таймер для опроса, чтобы закрыть его, если все будет сделано.

Моим "чудесным ответом" будет обратный вызов, который вызывается, когда последний подписчик избавляется, так что я могу просто убрать дом и выбросить весь объект, но я не могу найти ничего подобного. Итак, как я могу управлять подписками?

1 ответ

Решение

Один из подходов заключается в подсчете количества подписчиков и подписчиков. И если число достигает 0, удалите документ. Это будет выглядеть примерно так:

int numberOfSubscribers = 0;

...

public Observable<T> expose(){
    return subject.asObservable()
        .doOnSubscribe(()-> numberOfSubscribers++)
        .doOnDispose(()-> {
            numberOfSubscribers--;
            if (numberOfSubscribers == 0){
              //remove the object
             }
        });

Конечно, вам нужно добавить поддержку проблемы параллелизма здесь (synchronized/atomic int), это всего лишь черновик.

Надеюсь это поможет:)

Другие вопросы по тегам