Как создать механизм опроса с сопрограммами kotlin?
Я пытаюсь создать механизм опроса с использованием сопрограмм kotlin и хочу остановиться, когда нет подписчиков, и активен, когда есть хотя бы один подписчик. Мой вопрос, это
sharedFlow
правильный выбор в этом сценарии, или я должен использовать
channel
. Я пробовал использовать
channelFlow
но я не знаю, как закрыть канал (не
cancel
работа) за пределами тела блока. Кто-нибудь может помочь? Вот отрывок.
fun poll(id: String) = channelFlow {
while (!isClosedForSend) {
try {
send(repository.getDetails(id))
delay(MIN_REFRESH_TIME_MS)
} catch (throwable: Throwable) {
Timber.e("error -> ${throwable.message}")
}
invokeOnClose { Timber.e("channel flow closed.") }
}
}
2 ответа
Вы можете использовать SharedFlow, который генерирует значения в широковещательном режиме (не будет генерировать новое значение, пока предыдущее не будет использовано всеми сборщиками).
val sharedFlow = MutableSharedFlow<String>()
val scope = CoroutineScope(Job() + Dispatchers.IO)
var producer: Job()
scope.launch {
val producer = launch() {
sharedFlow.emit(...)
}
sharedFlow.subscriptionCount
.map {count -> count > 0}
.distinctUntilChanged()
.collect { isActive -> if (isActive) stopProducing() else startProducing()
}
fun CoroutineScope.startProducing() {
producer = launch() {
sharedFlow.emit(...)
}
}
fun stopProducing() {
producer.cancel()
}
Во-первых, при вызове channelFlow(block) нет необходимости закрывать канал вручную. После выполнения блока канал автоматически закроется.
Я думаю, что функция построения сопрограмм может быть тем, что вам нужно. Но, к сожалению, это все еще экспериментальный api.
fun poll(id: String) = someScope.produce {
invokeOnClose { Timber.e("channel flow closed.") }
while (true) {
try {
send(repository.getDetails(id))
// delay(MIN_REFRESH_TIME_MS) //no need
} catch (throwable: Throwable) {
Timber.e("error -> ${throwable.message}")
}
}
}
fun main() = runBlocking {
val channel = poll("hello")
channel.receive()
channel.cancel()
}
Функция создания будет приостановлена, если вы не вызовете метод receive () возвращенного канала, поэтому нет необходимости откладывать.
ОБНОВЛЕНИЕ: использование
broadcast
для совместного использования значений в нескольких ReceiveChannel.
fun poll(id: String) = someScope.broadcast {
invokeOnClose { Timber.e("channel flow closed.") }
while (true) {
try {
send(repository.getDetails(id))
// delay(MIN_REFRESH_TIME_MS) //no need
} catch (throwable: Throwable) {
Timber.e("error -> ${throwable.message}")
}
}
}
fun main() = runBlocking {
val broadcast = poll("hello")
val channel1 = broadcast.openSubscription()
val channel2 = broadcast.openSubscription()
channel1.receive()
channel2.receive()
broadcast.cancel()
}