Плоская карта RxJava: что происходит, когда завершается одно из наблюдаемых результатов?

Я новичок в RxJava, я знаю, что плоские карты предназначены для отображения испускаемого элемента на наблюдаемый. Я также знаю, что на основании документации все излучаемые наблюдаемые объединяются (сглаживаются) в один наблюдаемый поток.

Мне было интересно, что произойдет, если какая-либо из этих внутренних наблюдаемых будет завершена?

например: у меня есть наблюдаемое, которое испускает ключ данных элемента. Мне нужно сделать еще один асинхронный http-вызов, чтобы получить данные об элементе с сервера, поэтому я вызываю его с помощью другого наблюдаемого. Я использую плоскую карту, чтобы соединить эти два и создать одну основную наблюдаемую.

Когда вызывается метод run() следующих "SomeMethodThatWantsItems"?

public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
    Consumer<Item> onNextConsumer = 
    Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
    searchObservable
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Consumer<Item>(){
                           @Override
                           public void accept(@NonNull Item item) throws Exception {
                               //Do stuff with the item
                           }
                       }
                , new Consumer<Exception>() { //some implementation of onErrorConsumer
                    }
                 //OnComplete
                , new Action(){

                        @Override
                        public void run() throws Exception {
                            //When does this get called??? after the search complete or when the first http call is successful? 
                        }
                    });

}

private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {

            //Assume that our search engine call onFind everytime it finds something
            searchEngine.addSearchListener(new searchEngineResultListener(){
                @Override
                public void onFind(String foundItemKey){
                    emitter.onNext(foundItemKey);
                }

                @Override
                public void onFinishedFindingResults(){
                    emitter.onComplete();
                }
            });

        }
    });
}

private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{

    return Observable.create(new ObservableOnSubscribe<Item>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {

            //Call the server to get the item
            httpCaller.call(key, new onCompleteListener(){
                @Override
                public void onCompletedCall(Item result)
                {
                    emitter.onNext(result);
                    //The result is complete! end the stream
                    emitter.onComplete();
                }
            });
        }
    });
}

public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
    //Where everything comes together
    Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
    retuern searchResultObservable
            .observeOn(Schedulers.newThread())
            .flatMap(new Function<String, Observable<Item>>(){
                @Override
                public Observable<Item> apply(String key){
                    return getItemByKey(httpCaller, key);
                }
            });
}

1 ответ

Решение

onComplete() всегда звоните один раз, и тогда потоки прекращаются. (это часть наблюдаемого контракта).
Это означает, что в вашем случае ваш onComplete() в SomeMethodThatWantsItems будет вызван после того, как все предметы были найдены.
В случае flatMap(), завершение каждого внутреннего ObservableБуду просто сигнализировать об источнике Observable прекратить выкладывание предмета изнутри Observable к источнику Observable, flatMap() объединяет предметы изнутри Observable пока этот поток отправляет элементы, так что он в основном потребляет весь внутренний Observable поток в исходный поток, весь поток до события завершения 3, как onComplete(), так что в случае, когда внутренний Observable может испускать более 1 элемента, это означает, что он будет производить более 1 излучения в исходном потоке.

Другие вопросы по тегам