Как я могу отправлять предметы в Kotlin.Flow (например, предмет поведения)

Я хотел знать, как я могу отправлять / отправлять Kotlin.Flow, так что мой вариант использования:

В потребительской /ViewModel/Presenter я могу подписаться с collect функция:

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

Но проблема в Repository сторона, с RxJava мы могли бы использовать объект Поведения выставить его как Observable/Flowable и испустить новые предметы, как это:

behaviourSubject.onNext(true)

Но всякий раз, когда я строю новый поток:

flow {

}

Я могу только собирать. Как я могу отправить значения в поток?

1 ответ

Решение

Если вы хотите получить последнее значение в подписке / коллекции, вы должны использовать ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>()

Это будет копировать BehaviourSubject, чтобы выставить канал как поток:

// Repository
fun observe() {
  return channel.asFlow()
}

Теперь, чтобы отправить событие / значение, которое подвергается Flow просто отправить на этот канал.

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}

Приставка:

ложный

Если вы хотите получать значения только после начала сбора, вы должны использовать BroadcastChannel вместо.

Чтобы было понятно:

Ведет себя как Rx's PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println($it)
    }

  // 3. Send another event
  channel.send(false)
}

ложный

Только false печатается как первое событие было отправлено раньше collect { },


Ведет себя как Rx's BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println($it)
    }

  // 3. Send another event
  channel.send(false)
}

правда

ложный

Оба события печатаются, вы всегда получаете последнее значение (если присутствует).

Также хочу отметить развитие команды Котлина на DataFlow (имя ожидает рассмотрения):

Который кажется более подходящим для этого случая использования (поскольку это будет холодный поток).

Взгляните на документацию MutableStateFlow, поскольку она заменяетConflatedBroadcastChannel это будет устаревшим очень скоро.

Для лучшего контекста просмотрите все обсуждение исходной проблемы в репозитории Kotlin на Github.

Поскольку в вашем вопросе android Я добавлю реализацию для Android, которая позволит вам легко создать BehaviorSubject или PublishSubject который управляет своим собственным жизненным циклом.

Это актуально в Android, потому что вы не хотите забывать закрывать канал и утечку памяти. Эта реализация позволяет избежать необходимости явно "избавляться" от реактивного потока, привязывая его к созданию и уничтожению фрагмента / действия. Похожий наLiveData

interface EventReceiver<Message> {
    val eventFlow: Flow<Message>
}

interface EventSender<Message> {
    fun postEvent(message: Message)
    val initialMessage: Message?
}

class LifecycleEventSender<Message>(
    lifecycle: Lifecycle,
    private val coroutineScope: CoroutineScope,
    private val channel: BroadcastChannel<Message>,
    override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {

    init {
        lifecycle.addObserver(this)
    }

    override fun postEvent(message: Message) {
        if (!channel.isClosedForSend) {
            coroutineScope.launch { channel.send(message) }
        } else {
            Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun create() {
        channel.openSubscription()
        initialMessage?.let { postEvent(it) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun destroy() {
        channel.close()
    }
}

class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
    EventReceiver<Message> {
    override val eventFlow: Flow<Message> = channel.asFlow()
}

abstract class EventRelay<Message>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    channel: BroadcastChannel<Message>,
    initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
    EventSender<Message> by LifecycleEventSender<Message>(
        lifecycle,
        coroutineScope,
        channel,
        initialMessage
    )

Используя Lifecycle библиотеки с Android, теперь я могу создать BehaviorSubject который очищается после уничтожения активности / фрагмента

class BehaviorSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    ConflatedBroadcastChannel(),
    initialMessage
)

или я могу создать PublishSubject с помощью буферизованного BroadcastChannel

class PublishSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    BroadcastChannel(Channel.BUFFERED),
    initialMessage
)

И теперь я могу сделать что-то вроде этого

class MyActivity: Activity() {

    val behaviorSubject = BehaviorSubject(
        this@MyActivity.lifecycle,
        this@MyActivity.lifecycleScope
    )

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        if (savedInstanceState == null) { 

            behaviorSubject.eventFlow
                .onEach { stringEvent ->
                    Log.d("BehaviorSubjectFlow", stringEvent)
                    // "BehaviorSubjectFlow: Initial Message"
                    // "BehaviorSubjectFlow: Next Message"
                }
                .flowOn(Dispatchers.Main)
                .launchIn(this@MyActivity.lifecycleScope)

        }
    }

    override fun onResume() {
        super.onResume()

        behaviorSubject.postEvent("Next Message")
    }
}