RXJava 2x отписаться от PublishSubject
Я пытаюсь преобразовать пример PublishSubject
от Rx v1x до v2x. У меня есть следующее, но когда я звоню dispose()
, onNext()
продолжает срабатывать (как и ожидалось, как на примере), но вывод также продолжает выводиться на стандартный вывод. Оригинальный пример говорит unsubscribe()
но это невозможно на Disposable
:
Обновлено с полным кодом в ответ на комментарий ниже
fun main(args: Array<String>) {
// doNoSubjectExample()
doSubjectExample()
}
private fun doSubjectExample() {
val disposable = TwitterSubject().observe().subscribe(
{ status -> println("Status: {$status)}") },
{ error -> println("Error callback: $error") })
TimeUnit.SECONDS.sleep(10)
disposable.dispose()
}
private class TwitterSubject {
val subject = PublishSubject.create<Status>()
init {
val twitterStream = TwitterStreamFactory().instance
// See: https://stackru.com/questions/37672023/how-to-create-an-instance-of-anonymous-interface-in-kotlin/37672334
val listner = object : StatusListener {
override fun onStatus(status: Status?) {
subject.onNext(status)
}
override fun onException(ex: Exception?) {
subject.onError(ex)
}
override fun onTrackLimitationNotice(numberOfLimitedStatuses: Int) {
// Not implemented.
}
override fun onStallWarning(warning: StallWarning?) {
// Not implemented.
}
override fun onDeletionNotice(statusDeletionNotice: StatusDeletionNotice?) {
// Not implemented.
}
override fun onScrubGeo(userId: Long, upToStatusId: Long) {
// Not implemented.
}
}
Twitter4JHelper.addStatusListner(twitterStream, listner)
twitterStream.sample()
}
fun observe(): Observable<Status> = subject
}
Как отписать наблюдателя от субъекта (т. Е. Как изменился этот пример для v1x для v2.x)?