Потоки - клонирование потока без многократной итерации - я делаю это правильно?
Я только начинаю знакомиться с потоками Котлина.
Для этого я использую их для анализа содержимого двоичного файла, который я буду моделировать с использованием следующего потока:
fun testFlow() = flow {
println("Starting loop")
try {
for (i in 0..5) {
emit(i)
delay(100)
}
println("Loop has finished")
}
finally {
println("Finally")
}
}
Теперь мне нужно несколько раз содержимое файла, чтобы извлечь различные наборы информации. Однако я не хочу читать файл дважды, а только один раз.
Поскольку не существует встроенного механизма клонирования / дублирования потока, я разработал следующую вспомогательную функцию:
interface MultiConsumeBlock<T> {
suspend fun subscribe(): Flow<T>
}
suspend fun <T> Flow<T>.multiConsume(capacity: Int = DEFAULT_CONCURRENCY, scope: CoroutineScope? = null, block: suspend MultiConsumeBlock<T>.() -> Unit) {
val channel = buffer(capacity).broadcastIn(scope ?: CoroutineScope(coroutineContext))
val context = object : MultiConsumeBlock<T> {
override suspend fun subscribe(): Flow<T> {
val subscription = channel.openSubscription()
return flow { emitAll(subscription) }
}
}
try {
block(context)
} finally {
channel.cancel()
}
}
который я потом использую вот так (подумайте об аналогии с файлом: flow a
получает каждую запись, поток b
только первые 3 записи (="заголовок файла") и поток c
все после шапки)
fun main() = runBlocking {
val src = testFlow()
src.multiConsume {
val a = subscribe().map { it }
val b = subscribe().drop(3).map{ it + it}
val c = subscribe().take(3).map{ it * it}
mapOf("A" to a, "B" to b, "C" to c).map { task -> launch { task.value.collect{ println("${task.key}: $it")} } }.toList().joinAll()
}
}
Выход:
Starting loop
A: 0
C: 1
A: 1
C: 2
A: 4
C: 3
A: 9
C: 4
A: 16
C: 5
B: 10
C: 6
B: 12
C: 7
B: 14
C: 8
B: 16
C: 9
B: 18
C: 10
B: 20
C: 11
Loop has finished
Finally
Который пока выглядит хорошо. Однако я не уверен, правильно ли я использую потоки Котлина в этом отношении.
Открываюсь ли я для тупиков, пропущенных исключений и т. Д.?
В документации просто говорится:
Все реализации интерфейса Flow должны соответствовать двум ключевым свойствам, подробно описанным ниже:
- Сохранение контекста.
- Исключение прозрачности.
Но я не уверен, так ли это для моей реализации или я что-то упустил.
Или, может быть, есть лучший способ вообще?