onCompleted вызывается раньше, чем обрабатывается RxJava concatMap
У меня есть две наблюдаемые и использовал concatDelayError для последовательной обработки.
Моя проблема в том, что onNext и onCompleted вызываются до обработки. Как я узнаю, что вся обработка завершена с помощью concatDelayError?
Псудо-код:
public Observable<Integer> concat(){
int x = 10;
int y = 20;
Observable obx = Observable.create(emitter -> {
try {
int x = doSomeThing();
emitter.onNext(x);
emitter.onCompleted();
} catch (SQLiteException e) {
emitter.onError(e);
}
}, Emitter.BackpressureMode.BUFFER);
Observable oby = Observable.create(emitter -> {
try {
int y = doSomeThing();
emitter.onNext(y);
emitter.onCompleted();
} catch (SQLiteException e) {
emitter.onError(e);
}
}, Emitter.BackpressureMode.BUFFER);
Observable concated = Observable.concatDelayError(ob1,ob2)
.compose(applySchedulers())
.replay().autoConnect();
}
//somewhere else
concat().subscribe(mReplaySubject);
//somewhere else
mReplaySubject.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
launchActivity(SplashActivity.this, HomeActivity.class);
SplashActivity.this.finish();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer value) {
}
});
Я использую reply() с autoConnect() и подписываюсь через ReplySubject, так как мне нужно поделиться одной подпиской.
1 ответ
Решение
Ты можешь использовать doOnSubscribe
до replay().autoConnect()
, но вам также нужен способ убедиться, что возвращенный Observable
устанавливается не более одного раза:
final class OnceObservable {
final AtomicReference<Observable<Integer>> ref = new AtomicReference<>();
int x;
int y;
Observable<Integer> concat() {
Observable<Integer> obs = ref.get();
if (obs != null) {
return;
}
obs = Observable.concatDelayError(
Observable.fromCallable(() -> doSomething()),
Observable.fromCallable(() -> doSomethingElse())
)
.doOnSubscribe(() -> {
x = 10;
y = 20;
})
.replay().autoConnect();
if (!ref.compareAndSet(null, obs)) {
obs = ref.get();
}
return obs;
}
}