InterruptedException в потоке кеша RxJava при отладке на Android

Иногда, когда я отлаживаю свое приложение, я сталкиваюсь с InterruptedException в RxCachedThreadScheduler-1. Вот след:

Fatal Exception: java.lang.InterruptedException
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1991)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2025)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1048)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:776)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1035)

У меня есть пользовательское представление, в котором я подписываюсь на свою наблюдаемую версию так:

@Override
protected void onAttachedToWindow() {
    super.onAttachedToWindow();

    sub = FollowHandler.getInstance().getObservable()
            .filter(new Func1<FollowEvent, Boolean>() {
                @Override
                public Boolean call(FollowEvent followEvent) {
                    if(followEvent == null || followEvent.user == null
                            || user == null)
                        return false;

                    return followEvent.user.id == user.id;
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<FollowEvent>() {
                @Override
                public void onCompleted() {}

                @Override
                public void onError(Throwable e) {}

                @Override
                public void onNext(FollowEvent followEvent) {
                    reactToThisNiceEvent(followEvent);
                }
            });
}

@Override
protected void onDetachedFromWindow() {
    super.onDetachedFromWindow();

    if(sub != null)
        sub.unsubscribe();
}

Вот наблюдаемое:

eventSubject.asObservable()
        .observeOn(Schedulers.io())
        .doOnNext(new Action1<FollowEvent>() {
            @Override
            public void call(FollowEvent followEvent) {
                if(followEvent != null)
                    doSomethingNice(followEvent);
            }
        })
        .share();

в котором eventSubject является простым PublishSubject. Я использую RxAndroid 1.1.0 наряду с RxJava 1.1.0.

Кто-нибудь знает, почему это происходит?

1 ответ

Я не уверен, почему это происходит, но попробуйте сделать это:

sub = FollowHandler.getInstance().getObservable()
            .filter(...)
            .subscribeOn(Schedulers.io())    //  <<<<<<<<<<
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);

Кроме того, я думаю, что вам не нужно share():

eventSubject.asObservable()
        .doOnNext(...)
        .subscribeOn(Schedulers.io())   // <<<<< subscribeOn instead of observeOn, but actually, you don't need it here...
        .share();     // <<<<< remove it

я использую Subject как eventbus очень часто, как я описал выше. И у меня никогда не было таких проблем.

PS В onDetachedFromWindow() было бы лучше, если вы проверите, подписка отписалась или нет. Я знаю, что этот метод вызывается в основном потоке, и одновременный доступ к этому Subscription невозможно, но я думаю, что это хороший стиль

if(sub != null && !sub.isUnsubscribed())
        sub.unsubscribe();
Другие вопросы по тегам