Как отменить / отказаться от подписки на сопрограммы Flow
Я замечаю странное поведение при попытке преждевременно отменить поток. Взгляните на следующий пример.
Это простой поток, который выдает целочисленные значения
private fun createFlow() = flow {
repeat(10000) {
emit(it)
}
}
Затем я звоню createFlow
функция с использованием этого кода
CoroutineScope(Dispatchers.Main).launch {
createFlow().collect {
Log.i("Main", "$it isActive $isActive")
if (it == 2) {
cancel()
}
}
}
Это то, что распечатано
0 isActive true
1 isActive true
2 isActive true
3 isActive false
4 isActive false
etc...etc
Теперь я ожидал бы, что поток должен прекратить испускать целые числа, как только он достигнет значения 2, но вместо этого он фактически переключает флаг isActive на false и продолжает излучать без остановки.
Когда я добавляю задержку между выбросами, поток ведет себя так, как я ожидал.
private fun createFlow() = flow {
repeat(10000) {
delay(500) //add a delay
emit(it)
}
}
Это то, что распечатывается после повторного вызова потока (что является ожидаемым поведением).
0 isActive true
1 isActive true
2 isActive true
Что я могу сделать, чтобы отменить выброс потока точно на указанном значении без добавления задержки?
1 ответ
Я нашел обходной путь в этой связанной проблеме
Я заменил каждый collect
с safeCollect
функция в моем проекте:
/**
* Only proceed with the given action if the coroutine has not been cancelled.
* Necessary because Flow.collect receives items even after coroutine was cancelled
* https://github.com/Kotlin/kotlinx.coroutines/issues/1265
*/
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
collect {
coroutineContext.ensureActive()
action(it)
}
}
Я хочу добавить, что в версии 1.3.7 выбросы от Flow Builder теперь проверяют статус отмены и могут быть отменены правильно. Таким образом, рассматриваемый код будет работать, как ожидалось
Я придумал это недавно
кажется, что он фактически отменится, только если он достигнет точки приостановки, и в вашем коде, который излучает, такой точки нет
для решения этой проблемы либо добавьте yield() между выбросами, либо другую функцию приостановки, например delay(100)