RxAndroid: onCompleted не вызывается
Я использую библиотеки rx в моем приложении, чтобы вызвать REST API на моем сервере и показать результаты на экране. Я также следую шаблону дизайна MVP. Так что у меня есть классы Presenter и Interactor. В MainInteractor.java у меня есть следующий метод:
public Observable<Card> fetchCard(final String clientId, final CardFetchedListener listener) {
Log.i(TAG, "FetchCard method");
// Manipulate the observer
return CARDS
.doOnCompleted(new Action0() {
@Override
public void call() {
Log.d(TAG, "CARDS Completed");
}
})
.flatMap(new Func1<Card, Observable<Card>>() {
@Override
public Observable<Card> call(final Card card) {
return ResourceClient.getInstance(card)
.getIDCard()
.observeOn(AndroidSchedulers.mainThread())
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.w(TAG, "interactor -> fetchCard 2", throwable);
}
}
})
.flatMap(new Func1<CardMeta, Observable<Card>>() {
@Override
public Observable<Card> call(CardMeta cardMeta) {
card.setCardMeta(cardMeta);
saveOrUpdateCardToTheDb(card);
return Observable.just(card);
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
Log.d(TAG, "Completed body");
}
});
}
});
}
В журналах я вижу строку "Завершенное тело".
Вышеуказанный метод вызывается классом MainPresenter.java следующим образом:
interactor.fetchCard(clientId, this)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Card>() {
@Override
public void onCompleted() {
Log.i(TAG, "fetchCard onCompleted");
view.hideProgressDialog();
view.updateCardsAdapter(cards);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Fetch Card error ", e);
onFailure(parseThrowable(e));
}
@Override
public void onNext(Card card) {
if (card != null) {
Log.i(TAG, card.getTenant() + " was fetched and will be displayed");
}
}
});
Проблема в том, что метод onCompleted в классе Presenter никогда не вызывается. Я попытался вызвать onCompleted самостоятельно, и это сработало, но проблема в том, что я не знаю, когда наблюдаемая закончила испускать карты.
Что я здесь не так делаю?
ОБНОВИТЬ
КАРТЫ также являются наблюдаемыми, которые содержат метаинформацию. Инициализируется с помощью
Observable.from(tenants)
.filter(...).flatMap(// I'm using create operator here and it is calling its onCompleted method successflly);