PublishSubject вызывает дорогостоящую функцию для всех наблюдателей
У меня есть конкретный сценарий, в котором я реализовал PublishSubject для создания элементов на основе настраиваемого события. Для каждого элемента, который будет отправлен, мне также нужно сохранить это значение (дорогостоящая операция). Я пытаюсь достичь функции (например,map
), который будет вызываться один раз для всех наблюдателей, а затем элемент будет получен каждым наблюдателем через onNext()
метод.
Тема:
static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()
Триггер (испускающие предметы):
commonSubject.onNext(new SomeResult())
Выставляем тему (будет использоваться контроллером):
public static Observable<SomeResult> observeResults() {
return commonSubject.share();
}
Контроллер:
public Observable<SomeResult> observeResults() {
return CustomConsumer.observeResults()
.observeOn(Schedulers.single());
}
Подписчики:
CustomControllerResult.observeResults().subscribe(result -> doSomething());
CustomControllerResult.observeResults().subscribe(result -> doSomethingElse());
Каждый Observer получает элементы, как и ожидалось, но если я добавлю дорогостоящую операцию к контроллеру, она будет вызываться для каждого наблюдателя (чего я не хочу):
public Observable<SomeResult> observeResults() {
return CustomConsumer.observeResults()
.observeOn(Schedulers.single())
.compose(persistResult())
.compose(logResult())
.share();
}
Есть идеи, как добиться желаемого результата?
1 ответ
Проблема в том, что каждый раз observeResults()
называется, он создает новый Observable с share
оператор. Но созданный Observable не передается подписчикам.
Вы можете изменить свой код на:
Observable<SomeResult> observable = CustomControllerResult.observeResults()
observable.subscribe(result -> doSomething());
observable.subscribe(result -> doSomethingElse());
Или вы можете изменить observeResults
метод для возврата общего Observable:
static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()
static final Observable<SomeResult> observable = commonSubject
.observeOn(Schedulers.single())
.compose(persistResult())
.compose(logResult())
.share();
public static Observable<SomeResult> observeResults() {
return observable;
}