RXJava2 управлять подписками
Мне нужно уточнить, каков наилучший подход к управлению конкретным сценарием с использованием RxJava2 (на этом основана вся структура приложения):
В моем приложении многие люди могут вносить изменения в один и тот же документ, поэтому я должен предоставить каждое изменение всем, кто просматривает документ. Но этот объект очень сложный и тяжелый, поэтому мне нужно удалить его из памяти, когда последний человек закроет его. И еще: документ может быть дочерним по отношению к другому документу, поэтому каждое изменение в родительском документе должно быть отправлено всем дочерним элементам.
Что я сделал до сих пор: я создал менеджера, чтобы каждый запрос на документ приходил к нему. Всякий раз, когда кому-то нужно работать над документом, я смотрю на карте, если документ уже открыт. Если это не так, я создаю экземпляр BaseDocument, который получает данные из документа и объект PublishSubject для распространения событий и добавления к этой карте. Затем я подписываю пользователя Observer на PublishSubject для получения изменений. Всякий раз, когда пользователю нужно что-то изменить, он отправляет изменение в BaseDocument, он вносит это изменение и отправляет новую версию через onNext() всем. Все идет нормально.
Моя проблема в том, что я не могу контролировать, когда кто-то удаляет обозреватель документа, поэтому я не могу контролировать, когда документ больше не нужен, поэтому я могу сохранить любые несохраненные изменения и уничтожить объект. Я не могу найти какой-либо список подписки или что-то подобное, кроме "hasObservers()", и я не хочу добавлять таймер для опроса, чтобы закрыть его, если все будет сделано.
Моим "чудесным ответом" будет обратный вызов, который вызывается, когда последний подписчик избавляется, так что я могу просто убрать дом и выбросить весь объект, но я не могу найти ничего подобного. Итак, как я могу управлять подписками?
1 ответ
Один из подходов заключается в подсчете количества подписчиков и подписчиков. И если число достигает 0, удалите документ. Это будет выглядеть примерно так:
int numberOfSubscribers = 0;
...
public Observable<T> expose(){
return subject.asObservable()
.doOnSubscribe(()-> numberOfSubscribers++)
.doOnDispose(()-> {
numberOfSubscribers--;
if (numberOfSubscribers == 0){
//remove the object
}
});
Конечно, вам нужно добавить поддержку проблемы параллелизма здесь (synchronized/atomic int), это всего лишь черновик.
Надеюсь это поможет:)