Потоки - клонирование потока без многократной итерации - я делаю это правильно?

Я только начинаю знакомиться с потоками Котлина.

Для этого я использую их для анализа содержимого двоичного файла, который я буду моделировать с использованием следующего потока:

fun testFlow() = flow {
    println("Starting loop")

    try {
        for (i in 0..5) {
            emit(i)
            delay(100)
        }

        println("Loop has finished")
    }
    finally {
        println("Finally")
    }
}

Теперь мне нужно несколько раз содержимое файла, чтобы извлечь различные наборы информации. Однако я не хочу читать файл дважды, а только один раз.

Поскольку не существует встроенного механизма клонирования / дублирования потока, я разработал следующую вспомогательную функцию:

interface MultiConsumeBlock<T> {
    suspend fun subscribe(): Flow<T>
}

suspend fun <T> Flow<T>.multiConsume(capacity: Int = DEFAULT_CONCURRENCY, scope: CoroutineScope? = null, block: suspend MultiConsumeBlock<T>.() -> Unit) {
    val channel = buffer(capacity).broadcastIn(scope ?: CoroutineScope(coroutineContext))

    val context = object : MultiConsumeBlock<T> {
        override suspend fun subscribe(): Flow<T> {
            val subscription = channel.openSubscription()
            return flow { emitAll(subscription) }
        }
    }
    try {
        block(context)
    } finally {
        channel.cancel()
    }
}

который я потом использую вот так (подумайте об аналогии с файлом: flow a получает каждую запись, поток b только первые 3 записи (="заголовок файла") и поток c все после шапки)

fun main() = runBlocking {
    val src = testFlow()

    src.multiConsume {
        val a = subscribe().map { it }
        val b = subscribe().drop(3).map{ it + it}
        val c = subscribe().take(3).map{ it * it}

        mapOf("A" to a, "B" to b, "C" to c).map { task -> launch { task.value.collect{ println("${task.key}: $it")} } }.toList().joinAll()
    }
}

Выход:

Starting loop
A: 0
C: 1
A: 1
C: 2
A: 4
C: 3
A: 9
C: 4
A: 16
C: 5
B: 10
C: 6
B: 12
C: 7
B: 14
C: 8
B: 16
C: 9
B: 18
C: 10
B: 20
C: 11
Loop has finished
Finally

Который пока выглядит хорошо. Однако я не уверен, правильно ли я использую потоки Котлина в этом отношении.
Открываюсь ли я для тупиков, пропущенных исключений и т. Д.?

В документации просто говорится:

Все реализации интерфейса Flow должны соответствовать двум ключевым свойствам, подробно описанным ниже:

  • Сохранение контекста.
  • Исключение прозрачности.

Но я не уверен, так ли это для моей реализации или я что-то упустил.
Или, может быть, есть лучший способ вообще?

0 ответов

Другие вопросы по тегам