Подписчик onNext вызывается до завершения асинхронных запросов в rxjava2.

Я реализовал шаблон хранилища в MVP, используя RxJava2

RemoteDataSource.java

public Observable<List<A>> getAList(){
          return ApiService.
                getAList()
                .compose(RxUtils.applySchedulers())
                .doOnSubscribe(disposable -> Timber.d(..))
                .doOnError(throwable -> Timber.d(..))
                .doOnComplete(() -> {
                    Timber.d(..);
                });
      }

LocalDataSource.java

public Observable<List<A>> getAList(){
    return mDbHelper .....from SQLBrite..
}

public void saveAList(List<<A> a){
    SQlBriteTransaction...
}

Repository.java (Обновление)

  @Inject
  public Repository(DownloadUtils downloadUtils){
   this.mDownloadUtils = downloadUtils;
   }

   @Override
   public Observable<List<A>> getAList(){
     return  mRemoteDataSource
            .getAList()
             .flatMapIterable(List<A> -> a)
            .flatMap(A a ->
      ************************************************************   
               return Observable.fromIterable(a.getB())
                     .flatMap((Function<B, ObservableSource<B>>) b ->
                                       Observable.create(emitter -> 
                                                emitter.onNext(new 
            DownloadUtils().downloadFiles(b,totalListCount,emitter))))
                                .toList()
                                .toObservable()

     ***************************************************
                    .toList()
                    .toObservable()
                    .doOnNext( List<A>  a -> {

     --------------Only the first change in B value is inserted in Db-
                       mLocalDataSource.saveAList(a);
                    });
    }

DownLoadUtils.java (обновление)

void downloadBFiles(B b, int totalCount,ObservableEmitter<B> emitter){
        fileCount = b.size;
         b.get(index).setDataToChange(dataToChange);

         *** I am using PR Downloader for aynchronous download using 
            RECURSION **
        PRDownloader.download(remoteUrl, filePath, fileName)
                .build()
        .setOnStartOrResumeListener(() -> {
            })
            .setOnProgressListener(progress -> {
                int progressPercent = (int) (progress.currentBytes * 
                   100 / progress.totalBytes);,

             })
            .start(new OnDownloadListener() {
                @Override
                public void onDownloadComplete() {
  ********************* emitter.onComplete() ******************


             @Override
                public void onError(Error error) {
                  }   
     }

Presenter.java

void getVideosFromRepo(){

    disposable = mRepository
                 .getAList()
                 .doOnSubscribe(d _-> "Started Loading")
                 .subscribe(
                   //OnNext
    ------------- Here the OnNext is being called before Asynchronous Operation completes!!-------

                    List<A> a -> mView.setAList(a);
                   )


}

Выше реализации презентатора возвращает список в onNext презентатора еще до завершения асинхронной загрузки... какие изменения необходимы, чтобы onNext(подписка) вызывалась после завершения загрузки.!!!

1 ответ

Вы используете асинхронные сервисы вне цепочки наблюдателей RxJava, поэтому RxJava не может управлять передаваемыми данными. поскольку downloadBFiles() использует отдельную цепочку наблюдателей, вы потеряли нить, так сказать.

Вместо того, чтобы использовать doOnNext() чтобы запустить загрузку, вам нужно будет использовать flatMap() так что результат загрузки будет включен в вашу цепочку наблюдателей.

Другие вопросы по тегам