RXJava 2x отписаться от PublishSubject

Я пытаюсь преобразовать пример PublishSubject от Rx v1x до v2x. У меня есть следующее, но когда я звоню dispose(), onNext() продолжает срабатывать (как и ожидалось, как на примере), но вывод также продолжает выводиться на стандартный вывод. Оригинальный пример говорит unsubscribe() но это невозможно на Disposable:

Обновлено с полным кодом в ответ на комментарий ниже

fun main(args: Array<String>) {
//  doNoSubjectExample()
  doSubjectExample()
}

private fun doSubjectExample() {
  val disposable = TwitterSubject().observe().subscribe(
      { status -> println("Status: {$status)}") },
      { error -> println("Error callback: $error") })

  TimeUnit.SECONDS.sleep(10)
  disposable.dispose()
}

private class TwitterSubject {
  val subject = PublishSubject.create<Status>()

  init {
    val twitterStream = TwitterStreamFactory().instance
    // See: https://stackru.com/questions/37672023/how-to-create-an-instance-of-anonymous-interface-in-kotlin/37672334
    val listner = object : StatusListener {
      override fun onStatus(status: Status?) {
        subject.onNext(status)
      }

      override fun onException(ex: Exception?) {
        subject.onError(ex)
      }

      override fun onTrackLimitationNotice(numberOfLimitedStatuses: Int) {
        // Not implemented.
      }

      override fun onStallWarning(warning: StallWarning?) {
        // Not implemented.
      }

      override fun onDeletionNotice(statusDeletionNotice: StatusDeletionNotice?) {
        // Not implemented.
      }

      override fun onScrubGeo(userId: Long, upToStatusId: Long) {
        // Not implemented.
      }
    }

    Twitter4JHelper.addStatusListner(twitterStream, listner)

    twitterStream.sample()
  }

  fun observe(): Observable<Status> = subject
}

Как отписать наблюдателя от субъекта (т. Е. Как изменился этот пример для v1x для v2.x)?

0 ответов

Другие вопросы по тегам