Работа комбайна Kotlin SharedFlow. Поведение zip в конкретной ситуации

Я совмещаю два а затем выполнение длительной рабочей операции.

Вначале я знаю состояние, поэтому я передаю «начальное значение» для обоих потоков. После этого пользователь может передавать в любой поток.

Оба потока в основном независимы, но в определенной ситуации пользователь может передавать в оба потока одновременно. Это означает, что комбайн запускается дважды, а длительное рабочее задание выполняется дважды, хотя на самом деле в этом случае мне интересно получать только оба значения, но выполнять задание только один раз.

Вот что у меня есть:

      val _numbers = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val numbers: SharedFlow<Int> = _numbers
val _strings = MutableSharedFlow<String>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val strings: SharedFlow<String> = _strings

combine(numbers, strings) { (number, strings) ->
    println("values $number - $strings. Starting to perform a long working job")
}
    .launchIn(CoroutineScope(Dispatchers.IO))

runBlocking {

    delay(500)
    // This is the initial values. I always know this at start.
    _numbers.emit(0)
    _strings.emit("a")

    // Depending of user action, number or string is emitted.

    delay(100)
    _numbers.emit(1)
    delay(100)
    _numbers.emit(2)
    delay(100)
    _numbers.emit(3)
    delay(100)
    _numbers.emit(4)
    delay(100)
    _strings.emit("b")
    delay(100)
    _strings.emit("c")
    delay(100)
    _strings.emit("d")
    delay(100)
    _strings.emit("e")
    delay(100)

    // In a specific situation both values need to change but I only want to trigger the long working job once
    _numbers.emit(10)
    _strings.emit("Z")
}

Это может привести к следующему:

      values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job

Или это:

      values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job

Из-за переполнения буфера иногда я могу добиться того, чего хочу (этого последнего), но в других случаях у меня есть это меня не интересует.

Могу ли я каким-то образом заставить при передаче на два начинать долгую работу только один раз?

https://pl.kotl.in/JA1Wdhra9

1 ответ

Если вы хотите сохранить 2 потока, различие между одиночными и двойными событиями должно быть основано на времени. Вы не сможете отличить быстрое обновление строки и числа от "двойного обновления".

Если вам подходит время, используйте перед длительной обработкой:

      combine(numbers, strings) { (number, string) -> number to string }
    .debounce(50)
    .onEach { (number, string) ->
        println("values $number - $string. Starting to perform a long working job")
    }
    .launchIn(CoroutineScope(Dispatchers.IO))

Здесь, combine только строит пары из двух потоков, но все равно получает все события, а затем debounceигнорирует быструю последовательность событий и отправляет только последнюю из быстрой последовательности. Это также вносит небольшую задержку, но все зависит от того, чего вы хотите достичь.

Если разделение на основе времени вам не подходит, вам нужен способ для производителя отправлять двойные события способом, отличным от двух одиночных событий. Для этого вы можете использовать единый поток событий, и вы можете, например, определять такие события:

      sealed class Event {
    data class NumberUpdate(val value: Int): Event()
    data class StringUpdate(val value: String): Event()
    data class DoubleUpdate(val num: Int, val str: String): Event()
}

Но тогда вам придется самому написать логику "комбинирования" (сохраняя состояние последнего числа и строки):

      flow {
    var num = 0
    var str = "a"
    emit(num to str)
    events.collect { e ->
        when (e) {
            is Event.NumberUpdate -> {
                num = e.value
            }
            is Event.StringUpdate -> {
                str = e.value
            }
            is Event.DoubleUpdate -> {
                num = e.num
                str = e.str
            }
        }
        emit(num to str)
    }
}
.onEach { (number, strings) ->
    println("values $number - $strings. Starting to perform a long working job")
}
.launchIn(CoroutineScope(Dispatchers.IO))
Другие вопросы по тегам