Плоская карта RxJava: что происходит, когда завершается одно из наблюдаемых результатов?
Я новичок в RxJava, я знаю, что плоские карты предназначены для отображения испускаемого элемента на наблюдаемый. Я также знаю, что на основании документации все излучаемые наблюдаемые объединяются (сглаживаются) в один наблюдаемый поток.
Мне было интересно, что произойдет, если какая-либо из этих внутренних наблюдаемых будет завершена?
например: у меня есть наблюдаемое, которое испускает ключ данных элемента. Мне нужно сделать еще один асинхронный http-вызов, чтобы получить данные об элементе с сервера, поэтому я вызываю его с помощью другого наблюдаемого. Я использую плоскую карту, чтобы соединить эти два и создать одну основную наблюдаемую.
Когда вызывается метод run() следующих "SomeMethodThatWantsItems"?
public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
Consumer<Item> onNextConsumer =
Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
searchObservable
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Item>(){
@Override
public void accept(@NonNull Item item) throws Exception {
//Do stuff with the item
}
}
, new Consumer<Exception>() { //some implementation of onErrorConsumer
}
//OnComplete
, new Action(){
@Override
public void run() throws Exception {
//When does this get called??? after the search complete or when the first http call is successful?
}
});
}
private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {
//Assume that our search engine call onFind everytime it finds something
searchEngine.addSearchListener(new searchEngineResultListener(){
@Override
public void onFind(String foundItemKey){
emitter.onNext(foundItemKey);
}
@Override
public void onFinishedFindingResults(){
emitter.onComplete();
}
});
}
});
}
private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{
return Observable.create(new ObservableOnSubscribe<Item>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {
//Call the server to get the item
httpCaller.call(key, new onCompleteListener(){
@Override
public void onCompletedCall(Item result)
{
emitter.onNext(result);
//The result is complete! end the stream
emitter.onComplete();
}
});
}
});
}
public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
//Where everything comes together
Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
retuern searchResultObservable
.observeOn(Schedulers.newThread())
.flatMap(new Function<String, Observable<Item>>(){
@Override
public Observable<Item> apply(String key){
return getItemByKey(httpCaller, key);
}
});
}
1 ответ
onComplete()
всегда звоните один раз, и тогда потоки прекращаются. (это часть наблюдаемого контракта).
Это означает, что в вашем случае ваш onComplete()
в SomeMethodThatWantsItems
будет вызван после того, как все предметы были найдены.
В случае flatMap()
, завершение каждого внутреннего Observable
Буду просто сигнализировать об источнике Observable
прекратить выкладывание предмета изнутри Observable
к источнику Observable
, flatMap()
объединяет предметы изнутри Observable
пока этот поток отправляет элементы, так что он в основном потребляет весь внутренний Observable
поток в исходный поток, весь поток до события завершения 3, как onComplete()
, так что в случае, когда внутренний Observable
может испускать более 1 элемента, это означает, что он будет производить более 1 излучения в исходном потоке.