Оператор share() не работает для Observable в Rxjava

У меня есть сценарий, где у меня есть эмиттер, который постоянно излучает данные, как это

fun subscribeForEvents(): Flowable<InkChannel> {
    return Flowable.create<InkChannel>({
        if (inkDevice.availableDeviceServices.contains(DeviceServiceType.EVENT_DEVICE_SERVICE)) {
            (inkDevice.getDeviceService(DeviceServiceType.EVENT_DEVICE_SERVICE) as EventDeviceService).subscribe(object : EventCallback {
                override fun onUserActionExpected(p0: UserAction?) {
                    it.onNext(InkChannel.UserActionEvent(p0))
                }

                override fun onEvent(p0: InkDeviceEvent?, p1: Any?) {
                    it.onNext(InkChannel.InkEvents<Any>(p0, p1))
                }

                override fun onUserActionCompleted(p0: UserAction?, p1: Boolean) {

                }
            }

            )
        }
    }, BackpressureStrategy.BUFFER).share()

}

теперь у меня есть сервис, который я запускаю при запуске приложения и слушаю его

  inkDeviceBus.subscribeForEvents()
                .filter { it -> (it as InkChannel.InkEvents<*>).event == InkDeviceEvent.STATUS_CHANGED }
                .map { it -> it as InkChannel.InkEvents<*> }
                .map { it -> it.value.toString() }
                .filter { value -> value == "CONNECTED" || value == "DISCONNECTED" }
                .map { it -> it == "CONNECTED" }
                .subscribeBy { b ->
                    if (b) stopSelf()
                }

У меня есть другое действие MainActivity, которое вызывается при запуске, где я наблюдаю то же событие. Теперь проблема в том, что только слушатель в службе получает события, а активность не получает никаких событий.

Теперь, когда я удаляю слушателя из службы, активность начинает получать события. Я использовал общий ресурс оператора для обмена наблюдаемым, но, похоже, он не работает

1 ответ

.share() Влияет только на экземпляр, к которому он обращен. С каждым звонком subscribeForEvents создает новый экземпляр .share() не меняет поведение.

Вам нужно позвонить subscribeForEvents один раз, а затем используйте возвращенное значение, когда вы хотите получать события. Пока используется один и тот же объект, он будет использовать общий слушатель.

Другие вопросы по тегам