Подписчик onNext вызывается до завершения асинхронных запросов в rxjava2.
Я реализовал шаблон хранилища в MVP, используя RxJava2
RemoteDataSource.java
public Observable<List<A>> getAList(){
return ApiService.
getAList()
.compose(RxUtils.applySchedulers())
.doOnSubscribe(disposable -> Timber.d(..))
.doOnError(throwable -> Timber.d(..))
.doOnComplete(() -> {
Timber.d(..);
});
}
LocalDataSource.java
public Observable<List<A>> getAList(){
return mDbHelper .....from SQLBrite..
}
public void saveAList(List<<A> a){
SQlBriteTransaction...
}
Repository.java (Обновление)
@Inject
public Repository(DownloadUtils downloadUtils){
this.mDownloadUtils = downloadUtils;
}
@Override
public Observable<List<A>> getAList(){
return mRemoteDataSource
.getAList()
.flatMapIterable(List<A> -> a)
.flatMap(A a ->
************************************************************
return Observable.fromIterable(a.getB())
.flatMap((Function<B, ObservableSource<B>>) b ->
Observable.create(emitter ->
emitter.onNext(new
DownloadUtils().downloadFiles(b,totalListCount,emitter))))
.toList()
.toObservable()
***************************************************
.toList()
.toObservable()
.doOnNext( List<A> a -> {
--------------Only the first change in B value is inserted in Db-
mLocalDataSource.saveAList(a);
});
}
DownLoadUtils.java (обновление)
void downloadBFiles(B b, int totalCount,ObservableEmitter<B> emitter){
fileCount = b.size;
b.get(index).setDataToChange(dataToChange);
*** I am using PR Downloader for aynchronous download using
RECURSION **
PRDownloader.download(remoteUrl, filePath, fileName)
.build()
.setOnStartOrResumeListener(() -> {
})
.setOnProgressListener(progress -> {
int progressPercent = (int) (progress.currentBytes *
100 / progress.totalBytes);,
})
.start(new OnDownloadListener() {
@Override
public void onDownloadComplete() {
********************* emitter.onComplete() ******************
@Override
public void onError(Error error) {
}
}
Presenter.java
void getVideosFromRepo(){
disposable = mRepository
.getAList()
.doOnSubscribe(d _-> "Started Loading")
.subscribe(
//OnNext
------------- Here the OnNext is being called before Asynchronous Operation completes!!-------
List<A> a -> mView.setAList(a);
)
}
Выше реализации презентатора возвращает список в onNext презентатора еще до завершения асинхронной загрузки... какие изменения необходимы, чтобы onNext(подписка) вызывалась после завершения загрузки.!!!
1 ответ
Вы используете асинхронные сервисы вне цепочки наблюдателей RxJava, поэтому RxJava не может управлять передаваемыми данными. поскольку downloadBFiles()
использует отдельную цепочку наблюдателей, вы потеряли нить, так сказать.
Вместо того, чтобы использовать doOnNext()
чтобы запустить загрузку, вам нужно будет использовать flatMap()
так что результат загрузки будет включен в вашу цепочку наблюдателей.