RxJava - как остановить публикацию PublishSubject, даже если вызывается onNext()
У меня есть взгляд, в котором я звоню следующее:
// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:
while(true){
myPublishSubject.onNext(someObservable)
}
Я хотел бы остановить эмиссию, но цикл продолжится вечно. Поэтому я хочу, чтобы вызов onNext ничего не делал. Но я боюсь, что если я вызову myPublishSubject.onComplete(), то в конечном итоге тема будет нулевой, и я получу NPE. Можно ли просто замолчать, даже если onNext() неоднократно вызывается. Это лучший способ просто отписаться?
1 ответ
Несколько заметок
Это довольно редкий случай, но если вы можете показать нам свое реальное намерение с Observable
Мы могли бы помочь вам в его разработке, если не лучше, то лучше.
Что ты можешь сделать
В моих примерах я использовал только переменную flag, которая довольно проста, это можно изменить в любом триггере, который вы используете для своего проекта.
Опция 1
Вы можете напрямую вызвать onComplete
на тему издателя
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
publishSubject.subscribe({
currentEmittedItemCount++
System.out.println(it)
}, {
System.out.println(it)
})
while (true) {
// Publish value on the subject
publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
// Print indication that the loop is still running
System.out.println("Still looping")
}
Вариант 2
Вы также можете удерживать ссылку на подписку, а затем утилизировать ее, это немного более семантически, чем предыдущая, поскольку она выполнит блок кода без вызова onNext(t)
когда ресурс утилизируется.
lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
override fun onComplete() {
// Print indication of completion for the subject publisher
System.out.println("Complete")
}
override fun onNext(t: String) {
// Test flag count synchonizer
currentEmittedItemCount++
// Print out current emitted item count
System.out.println(currentEmittedItemCount)
// Print current string
System.out.println(t)
}
override fun onError(e: Throwable) {
// Print error
System.out.println(e)
}
})
while (true) {
// Publish value on the subject
if (!disposable.isDisposed) publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) {
publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
disposable.dispose()
}
// Print indication that the loop is still running
System.out.println("Still looping")
}
Читать дальше на
Поскольку наблюдатель подписан, мы должны вызвать функцию unsubscribe, чтобы избежать многократного вызова onNext.
Я предлагаю вызвать onComplete, когда работа subject.onNext() будет завершена:
вот пример
PublishSubject<Integer> source = PublishSubject.create();
source.onNext(1);
source.onComplete();
source.subscribe(getObserver());
А затем в наблюдателе мы воссоздаем еще один экземпляр PublishSubject
source.subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
source = PublishSubject.create();
}
});
Надеюсь, этот подход поможет вам в вашем запросе