Передача / передача значений потока в BroadcastChannel

Сегодня изрядно застрял в проблеме с потоками / каналами Kotlin. По сути, я хочу взять значения, испускаемые из потока, и немедленно отправить их в канал. Затем мы подписываемся на этот канал как на поток с помощью открытого метода. Пример использования здесь - иметь подписку на канал, которая всегда активна, и поток, который можно включать и выключать независимо.

private val dataChannel = BroadcastChannel<Data>(1)

 suspend fun poll() {
    poller.start(POLLING_PERIOD_MILLISECONDS)
        .collect {
            dataChannel.send(it)
        }
 }

 suspend fun stopPoll() {
     poller.stop()
 }

 suspend fun subscribe(): Flow<Data> {
     return dataChannel.asFlow()
 }

Простой пример использования, который у меня есть, - это опросчик, который возвращает channelFlow. В идеале я мог бы затем передать в канал в методе сбора. Хотя это, похоже, не работает. Моя сопрограмма-новичок думала, что, поскольку сбор и отправка приостанавливаются, выбросы приостанавливаются в сборке, и мы застреваем.

Есть ли какие-нибудь встроенные функции для потока или канала, которые могут справиться с этим или любым другим способом достижения такого поведения?

1 ответ

Для вашего случая вы можете попробовать использовать горячий поток данныхSharedFlowвместо Channel:

      private val dataFlow = MutableSharedFlow<String>(extraBufferCapacity = 1)

 suspend fun poll() {
    poller.start(POLLING_PERIOD_MILLISECONDS)
        .collect {
            dataFlow.tryEmit(it)
        }
 }

 suspend fun stopPoll() {
     poller.stop()
 }

 fun subscribe(): Flow<Data> {
     return dataFlow
 }

tryEmit()- Пытается передать значение этому общему потоку без приостановки, поэтому его вызов не приостанавливает collectблокировать.

Другие вопросы по тегам