Как можно условно добавить асинхронную операцию в середине потока RxJava?
Вот упрощенная версия того, что я пытаюсь сделать (используя Kotlin и RxJava)
makeServerCall()
.doOnNext {
doStuff(it)
}
//TODO: if it == 0, call asyncOperation() and wait for its callback to fire
//before running the rest of the stream. Otherwise immediately run the rest
//of the stream
.flatMap {
observable1(it)
observable2(it)
Observable.merge(
getSpotSearchObservable(observable1),
getSpotSearchObservable(observable2)
}
.subscribeBy(onNext = {
allDone()
view?
})
Как я могу сжать в вызове asyncOperation()
и заставить остальную часть потока ждать, пока его обратный вызов не сработает, но только при соблюдении определенного условия? Кажется, что это, вероятно, тривиальная операция в Rx, но очевидное решение не приходит на ум.
1 ответ
Решение
FlatMap это!
.flatMap {
if (it == 0) {
return@flatMap asyncOperation()
.ignoreElements()
.andThen(Observable.just(0))
}
return@flatMap Observable.just(it)
}
.flatMap {
observable1(it)
observable2(it)
Observable.merge(
getSpotSearchObservable(observable1),
getSpotSearchObservable(observable2)
)
}