PublishSubject вызывает дорогостоящую функцию для всех наблюдателей

У меня есть конкретный сценарий, в котором я реализовал PublishSubject для создания элементов на основе настраиваемого события. Для каждого элемента, который будет отправлен, мне также нужно сохранить это значение (дорогостоящая операция). Я пытаюсь достичь функции (например,map), который будет вызываться один раз для всех наблюдателей, а затем элемент будет получен каждым наблюдателем через onNext() метод.

Тема:

static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()

Триггер (испускающие предметы):

commonSubject.onNext(new SomeResult())

Выставляем тему (будет использоваться контроллером):

public static Observable<SomeResult> observeResults() {
    return commonSubject.share();
}

Контроллер:

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single());
}   

Подписчики:

CustomControllerResult.observeResults().subscribe(result -> doSomething());
CustomControllerResult.observeResults().subscribe(result -> doSomethingElse());

Каждый Observer получает элементы, как и ожидалось, но если я добавлю дорогостоящую операцию к контроллеру, она будет вызываться для каждого наблюдателя (чего я не хочу):

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single())
            .compose(persistResult())
            .compose(logResult())
            .share();
}

Есть идеи, как добиться желаемого результата?

1 ответ

Решение

Проблема в том, что каждый раз observeResults() называется, он создает новый Observable с shareоператор. Но созданный Observable не передается подписчикам.

Вы можете изменить свой код на:

Observable<SomeResult> observable = CustomControllerResult.observeResults()
observable.subscribe(result -> doSomething());
observable.subscribe(result -> doSomethingElse());

Или вы можете изменить observeResults метод для возврата общего Observable:

static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()

static final Observable<SomeResult> observable = commonSubject
    .observeOn(Schedulers.single())
    .compose(persistResult())
    .compose(logResult())
    .share();

public static Observable<SomeResult> observeResults() {
    return observable;
}
Другие вопросы по тегам