Как возобновить поток после исключения
У меня такой код:
val channel = BroadcastChannel<Event>(10)
fun setup() {
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData() }
.catch { emit(DefaultData()) }
.onEach { handleData() }
.collect()
}
}
fun load() {
channel.offer(Event.Load)
}
В случае fetchSomeData
не работает, за исключением того, что он будет пойман catch
и передаются некоторые данные по умолчанию. Проблема в том, что сам поток отменяется и удаляется с подписчиков канала. Это означает, что любые новые события, предлагаемые каналу, будут игнорироваться, поскольку подписчиков больше нет.
Есть ли способ убедиться, что поток не будет отменен в случае исключения?
3 ответа
Вы должны поймать исключение fetchSomeData(), поэтому переместите
catch
из основного потока в fetchSomeData():
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
.onEach { handleData() }
.collect()
}
Мое решение состоит в том, чтобы просто перезапустить поток сбора
private fun startParsingMessages() {
coroutineScope?.launch {
sessionController.subscribeToMessages()
.onEach { /*code block*/ }
.catch {
it.cause
?.let { error -> Timber.e(error) }
?: Timber.e("startSession(): ${it.message}")
startParsingMessages() //here
}
.collect()
}
}
Я столкнулся с той же проблемой. Мой обходной путь выглядит примерно так:
/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)
class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {
private val okValue = Channel<T>(bufferCapacity)
private var failBlock: (suspend (Throwable) -> Unit)? = null
init {
GlobalScope.launch {
src.collect { value ->
runCatching { block(value) }
.onFailure { failBlock?.invoke(it) }
.onSuccess { okValue.send(value) }
}
okValue.close()
}
}
fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
failBlock = block
}
fun resumeFlow() = okValue.consumeAsFlow()
}
Применение:
someData
.onEachCatching { handleData() }
.onFailure { emit(DefaultData()) }
.resumeFlow()
.collect()