Почему onSubscribe не работает в rxjava?

Когда я запускаю код ниже, если я не пишу observeOn линия, приложение вылетает, потому что getView().showBlockLayout(isBlock); вызовите метод, который пытается скрыть или показать макет. но я попытался изменить ниже observeOn(AndroidSchedulers.mainThread()) в subscribeOn(AndroidSchedulers.mainThread()) и приложение снова вылетает!

subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean isBlock) {
                    getView().showBlockLayout(isBlock);
                    databaseHelper.getConference().setBlock(isBlock);
                    mConferenceModel.setBlock(isBlock);
                }
            }));

Я также проверяю это:

subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean isBlock) {
                    getView().showBlockLayout(isBlock);
                    databaseHelper.getConference().setBlock(isBlock);
                    mConferenceModel.setBlock(isBlock);
                }
            }));

и неожиданно это сработало и не вылетело! Я не использовал подписку в методе getBlockObservable (потому что я знаю, что мы можем установить его один раз)

это мой UserStore учебный класс

PublishSubject<Pair<String,Boolean>> mObservableBlock;

private UserStore(){
    mObservableBlock = PublishSubject.create();
    mInstance = this;
}

public static UserStore getInstance() {
    if(mInstance == null)
        new UserStore();
    return mInstance;
}

public Observable<Boolean> getBlockObservable(final String userId){
    return mObservableBlock
            .observeOn(Schedulers.computation())
            .filter(new Func1<Pair<String,Boolean>, Boolean>() {
        @Override
        public Boolean call(Pair<String,Boolean> s) {
            if(userId.equals(s.first))
                return true;
            return false;
        }
    }).map(new Func1< Pair<String, Boolean>, Boolean>() {

        @Override
        public Boolean call(Pair<String, Boolean> UserBlock) {
            return UserBlock.second;
        }
    });
}
public void publishBlockedUser(String userId,boolean isBlock){
    mObservableBlock.onNext(new Pair<String, Boolean>(userId,isBlock));
}

и вот как я импортировал зависимость rxjava в Gradle

compile 'io.reactivex:rxjava:1.1.5'
compile 'io.reactivex:rxandroid:1.2.0'

1 ответ

Решение

Как уже упоминалось в этой статье artice:

Важным фактом является то, что подписка не работает с субъектами.

Так что вы не можете использовать подписку на темы, и мы должны использовать observerOn(AndroidSchedulers.mainThread()) до подписки. поэтому все последующие методы вызываются на mainThread после этого.

проверьте этот средний artice

Другие вопросы по тегам