Как я могу отправлять предметы в Kotlin.Flow (например, предмет поведения)
Я хотел знать, как я могу отправлять / отправлять Kotlin.Flow
, так что мой вариант использования:
В потребительской /ViewModel/Presenter я могу подписаться с collect
функция:
fun observe() {
coroutineScope.launch {
// 1. Send event
reopsitory.observe().collect {
println(it)
}
}
}
Но проблема в Repository
сторона, с RxJava мы могли бы использовать объект Поведения выставить его как Observable/Flowable
и испустить новые предметы, как это:
behaviourSubject.onNext(true)
Но всякий раз, когда я строю новый поток:
flow {
}
Я могу только собирать. Как я могу отправить значения в поток?
1 ответ
Если вы хотите получить последнее значение в подписке / коллекции, вы должны использовать ConflatedBroadcastChannel:
private val channel = ConflatedBroadcastChannel<Boolean>()
Это будет копировать BehaviourSubject
, чтобы выставить канал как поток:
// Repository
fun observe() {
return channel.asFlow()
}
Теперь, чтобы отправить событие / значение, которое подвергается Flow
просто отправить на этот канал.
// Repository
fun someLogicalOp() {
channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}
Приставка:
ложный
Если вы хотите получать значения только после начала сбора, вы должны использовать BroadcastChannel
вместо.
Чтобы было понятно:
Ведет себя как Rx's PublishedSubject
private val channel = BroadcastChannel<Boolean>(1)
fun broadcastChannelTest() {
// 1. Send event
channel.send(true)
// 2. Start collecting
channel
.asFlow()
.collect {
println($it)
}
// 3. Send another event
channel.send(false)
}
ложный
Только false
печатается как первое событие было отправлено раньше collect { }
,
Ведет себя как Rx's BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>()
fun conflatedBroadcastChannelTest() {
// 1. Send event
confChannel.send(true)
// 2. Start collecting
confChannel
.asFlow()
.collect {
println($it)
}
// 3. Send another event
channel.send(false)
}
правда
ложный
Оба события печатаются, вы всегда получаете последнее значение (если присутствует).
Также хочу отметить развитие команды Котлина на DataFlow
(имя ожидает рассмотрения):
Который кажется более подходящим для этого случая использования (поскольку это будет холодный поток).
Взгляните на документацию MutableStateFlow, поскольку она заменяетConflatedBroadcastChannel
это будет устаревшим очень скоро.
Для лучшего контекста просмотрите все обсуждение исходной проблемы в репозитории Kotlin на Github.
Поскольку в вашем вопросе android
Я добавлю реализацию для Android, которая позволит вам легко создать BehaviorSubject
или PublishSubject
который управляет своим собственным жизненным циклом.
Это актуально в Android, потому что вы не хотите забывать закрывать канал и утечку памяти. Эта реализация позволяет избежать необходимости явно "избавляться" от реактивного потока, привязывая его к созданию и уничтожению фрагмента / действия. Похожий наLiveData
interface EventReceiver<Message> {
val eventFlow: Flow<Message>
}
interface EventSender<Message> {
fun postEvent(message: Message)
val initialMessage: Message?
}
class LifecycleEventSender<Message>(
lifecycle: Lifecycle,
private val coroutineScope: CoroutineScope,
private val channel: BroadcastChannel<Message>,
override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {
init {
lifecycle.addObserver(this)
}
override fun postEvent(message: Message) {
if (!channel.isClosedForSend) {
coroutineScope.launch { channel.send(message) }
} else {
Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun create() {
channel.openSubscription()
initialMessage?.let { postEvent(it) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun destroy() {
channel.close()
}
}
class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
EventReceiver<Message> {
override val eventFlow: Flow<Message> = channel.asFlow()
}
abstract class EventRelay<Message>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
channel: BroadcastChannel<Message>,
initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
EventSender<Message> by LifecycleEventSender<Message>(
lifecycle,
coroutineScope,
channel,
initialMessage
)
Используя Lifecycle
библиотеки с Android, теперь я могу создать BehaviorSubject
который очищается после уничтожения активности / фрагмента
class BehaviorSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
ConflatedBroadcastChannel(),
initialMessage
)
или я могу создать PublishSubject
с помощью буферизованного BroadcastChannel
class PublishSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
BroadcastChannel(Channel.BUFFERED),
initialMessage
)
И теперь я могу сделать что-то вроде этого
class MyActivity: Activity() {
val behaviorSubject = BehaviorSubject(
this@MyActivity.lifecycle,
this@MyActivity.lifecycleScope
)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
if (savedInstanceState == null) {
behaviorSubject.eventFlow
.onEach { stringEvent ->
Log.d("BehaviorSubjectFlow", stringEvent)
// "BehaviorSubjectFlow: Initial Message"
// "BehaviorSubjectFlow: Next Message"
}
.flowOn(Dispatchers.Main)
.launchIn(this@MyActivity.lifecycleScope)
}
}
override fun onResume() {
super.onResume()
behaviorSubject.postEvent("Next Message")
}
}