Как создать механизм опроса с сопрограммами kotlin?

Я пытаюсь создать механизм опроса с использованием сопрограмм kotlin и хочу остановиться, когда нет подписчиков, и активен, когда есть хотя бы один подписчик. Мой вопрос, это sharedFlow правильный выбор в этом сценарии, или я должен использовать channel. Я пробовал использовать channelFlow но я не знаю, как закрыть канал (не cancelработа) за пределами тела блока. Кто-нибудь может помочь? Вот отрывок.

       fun poll(id: String) = channelFlow {
            while (!isClosedForSend) {
                try {
                    send(repository.getDetails(id))
                    delay(MIN_REFRESH_TIME_MS)
                } catch (throwable: Throwable) {
                    Timber.e("error -> ${throwable.message}")
                }
                invokeOnClose { Timber.e("channel flow closed.") }
        }
    } 

2 ответа

Вы можете использовать SharedFlow, который генерирует значения в широковещательном режиме (не будет генерировать новое значение, пока предыдущее не будет использовано всеми сборщиками).

      val sharedFlow = MutableSharedFlow<String>()
val scope = CoroutineScope(Job() + Dispatchers.IO)
var producer: Job()

scope.launch {
    val producer = launch() {
            sharedFlow.emit(...)
    }

    sharedFlow.subscriptionCount
              .map {count -> count > 0}
              .distinctUntilChanged()
              .collect { isActive -> if (isActive) stopProducing() else startProducing()
}

fun CoroutineScope.startProducing() {
    producer = launch() {
        sharedFlow.emit(...)
    }
        
}

fun stopProducing() {
    producer.cancel()
}

Во-первых, при вызове channelFlow(block) нет необходимости закрывать канал вручную. После выполнения блока канал автоматически закроется.

Я думаю, что функция построения сопрограмм может быть тем, что вам нужно. Но, к сожалению, это все еще экспериментальный api.

      fun poll(id: String) = someScope.produce {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val channel = poll("hello")

    channel.receive()

    channel.cancel()
}

Функция создания будет приостановлена, если вы не вызовете метод receive () возвращенного канала, поэтому нет необходимости откладывать.

ОБНОВЛЕНИЕ: использование broadcast для совместного использования значений в нескольких ReceiveChannel.

      fun poll(id: String) = someScope.broadcast {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val broadcast = poll("hello")

    val channel1 = broadcast.openSubscription()
    val channel2 = broadcast.openSubscription()
    
    channel1.receive()
    channel2.receive()

    broadcast.cancel()
}
Другие вопросы по тегам