Как возобновить поток после исключения

У меня такой код:

val channel = BroadcastChannel<Event>(10)

fun setup() {
    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData() }
            .catch { emit(DefaultData()) }
            .onEach { handleData() }
            .collect()

    }
}

fun load() {
    channel.offer(Event.Load)      
}

В случае fetchSomeData не работает, за исключением того, что он будет пойман catchи передаются некоторые данные по умолчанию. Проблема в том, что сам поток отменяется и удаляется с подписчиков канала. Это означает, что любые новые события, предлагаемые каналу, будут игнорироваться, поскольку подписчиков больше нет.

Есть ли способ убедиться, что поток не будет отменен в случае исключения?

3 ответа

Вы должны поймать исключение fetchSomeData(), поэтому переместите catch из основного потока в fetchSomeData():

    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
            .onEach { handleData() }
            .collect()

    }

Мое решение состоит в том, чтобы просто перезапустить поток сбора

      private fun startParsingMessages() {
    coroutineScope?.launch {
        sessionController.subscribeToMessages()
            .onEach { /*code block*/ }
            .catch {
                it.cause
                    ?.let { error -> Timber.e(error) }
                    ?: Timber.e("startSession(): ${it.message}")

                startParsingMessages() //here
            }
            .collect()
    }
}

Я столкнулся с той же проблемой. Мой обходной путь выглядит примерно так:

/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)

class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {

    private val okValue = Channel<T>(bufferCapacity)

    private var failBlock: (suspend (Throwable) -> Unit)? = null

    init {
        GlobalScope.launch {
            src.collect { value ->
                runCatching { block(value) }
                    .onFailure { failBlock?.invoke(it) }
                    .onSuccess { okValue.send(value) }
            }

            okValue.close()
        }
    }

    fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
        failBlock = block
    }

    fun resumeFlow() = okValue.consumeAsFlow()
}

Применение:

someData
    .onEachCatching { handleData() }
    .onFailure { emit(DefaultData()) }
    .resumeFlow()
    .collect()
Другие вопросы по тегам