Почему моя подписка никогда не завершается?

Я пытаюсь узнать о rxJava и реактивном программировании в контексте Android, и я чувствую, что я почти на месте, я просто не могу понять всю картину, чтобы полностью понять, что я делаю.

У меня есть код ниже, который получает список экземпляров класса с именем iApps из базы данных

 myHelper m = new myHelper(getApplication());
        m.getApps()
                .observeOn(Schedulers.newThread())
                .subscribe(currentApps::addAll,
                        throwable -> Log.e("Error Observable", throwable.toString() + " " + Arrays.toString(throwable.getStackTrace())),
                        () -> compareLists(availableApps, currentApps));
}

Который использует следующие методы: // Из моей функции вызывающей базы данных

public  Callable<ArrayList<iApp>> getApps()
    {
        return this::getCurrentInfo;
    }

Пользовательская вспомогательная функция

public class myHelper {

    Context ctx;
    tQuery t;
    public myHelper(Context _ctx)
    {
        this.ctx = _ctx;
        t = new tQuery(_ctx);
    }

    Observable<ArrayList<iApp>> getApps()
    {
        return makeObservable(t.getApps())
                .subscribeOn(Schedulers.computation());
    }

    private static <T> Observable<T> makeObservable(final Callable<T> func) {
        return Observable.create(
                new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        try {
                            subscriber.onNext(func.call());

                        } catch (Exception ex) {
                            subscriber.onError(ex);
                        }
                    }
                });
    }

}

Однако мой по полной никогда не запускается. Я проверил onNext, просматривая результаты iApp и выводя одно из полей, чтобы видеть, что данные собираются, однако моя функция CompareLists никогда не запускается.

Может ли кто-нибудь объяснить мой недосмотр?

1 ответ

Решение

Ну, это было стыдно!

private static <T> Observable<T> makeObservable(final Callable<T> func) {
        return Observable.create(
                new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        try {
                            subscriber.onNext(func.call());
                            subscriber.onCompleted();

                        } catch (Exception ex) {
                            subscriber.onError(ex);
                        }
                    }
                });
    }
Другие вопросы по тегам