RxJava - как остановить публикацию PublishSubject, даже если вызывается onNext()

У меня есть взгляд, в котором я звоню следующее:

// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:    
while(true){
   myPublishSubject.onNext(someObservable)
}

Я хотел бы остановить эмиссию, но цикл продолжится вечно. Поэтому я хочу, чтобы вызов onNext ничего не делал. Но я боюсь, что если я вызову myPublishSubject.onComplete(), то в конечном итоге тема будет нулевой, и я получу NPE. Можно ли просто замолчать, даже если onNext() неоднократно вызывается. Это лучший способ просто отписаться?

1 ответ

Решение

Несколько заметок

Это довольно редкий случай, но если вы можете показать нам свое реальное намерение с ObservableМы могли бы помочь вам в его разработке, если не лучше, то лучше.

Что ты можешь сделать

В моих примерах я использовал только переменную flag, которая довольно проста, это можно изменить в любом триггере, который вы используете для своего проекта.

Опция 1

Вы можете напрямую вызвать onComplete на тему издателя

var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()

publishSubject.subscribe({
    currentEmittedItemCount++
    System.out.println(it)
}, {
    System.out.println(it)
})

while (true) {
    // Publish value on the subject
    publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()

    // Print indication that the loop is still running
    System.out.println("Still looping")
}

Вариант 2

Вы также можете удерживать ссылку на подписку, а затем утилизировать ее, это немного более семантически, чем предыдущая, поскольку она выполнит блок кода без вызова onNext(t) когда ресурс утилизируется.

lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()

disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
    override fun onComplete() {
        // Print indication of completion for the subject publisher
        System.out.println("Complete")
    }

    override fun onNext(t: String) {
        // Test flag count synchonizer
        currentEmittedItemCount++

        // Print out current emitted item count
        System.out.println(currentEmittedItemCount)

        // Print current string
        System.out.println(t)
    }

    override fun onError(e: Throwable) {
        // Print error
        System.out.println(e)
    }
})

while (true) {
    // Publish value on the subject
    if (!disposable.isDisposed) publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) {
        publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
        disposable.dispose()
    }

    // Print indication that the loop is still running
    System.out.println("Still looping")
}

Читать дальше на

Поскольку наблюдатель подписан, мы должны вызвать функцию unsubscribe, чтобы избежать многократного вызова onNext.

Я предлагаю вызвать onComplete, когда работа subject.onNext() будет завершена:

вот пример

PublishSubject<Integer> source = PublishSubject.create();

source.onNext(1);
source.onComplete();

source.subscribe(getObserver());

А затем в наблюдателе мы воссоздаем еще один экземпляр PublishSubject

 source.subscribe(new Observer<Boolean>() {
                                        @Override
                                        public void onSubscribe(Disposable d) {

                                    }

                                    @Override
                                    public void onNext(Integer value) {

                                    }

                                    @Override
                                    public void onError(Throwable e) {

                                    }

                                    @Override
                                    public void onComplete() {
                                        source = PublishSubject.create();
                                    }
                                });

Надеюсь, этот подход поможет вам в вашем запросе

Другие вопросы по тегам