Почему моя подписка никогда не завершается?
Я пытаюсь узнать о rxJava и реактивном программировании в контексте Android, и я чувствую, что я почти на месте, я просто не могу понять всю картину, чтобы полностью понять, что я делаю.
У меня есть код ниже, который получает список экземпляров класса с именем iApps из базы данных
myHelper m = new myHelper(getApplication());
m.getApps()
.observeOn(Schedulers.newThread())
.subscribe(currentApps::addAll,
throwable -> Log.e("Error Observable", throwable.toString() + " " + Arrays.toString(throwable.getStackTrace())),
() -> compareLists(availableApps, currentApps));
}
Который использует следующие методы: // Из моей функции вызывающей базы данных
public Callable<ArrayList<iApp>> getApps()
{
return this::getCurrentInfo;
}
Пользовательская вспомогательная функция
public class myHelper {
Context ctx;
tQuery t;
public myHelper(Context _ctx)
{
this.ctx = _ctx;
t = new tQuery(_ctx);
}
Observable<ArrayList<iApp>> getApps()
{
return makeObservable(t.getApps())
.subscribeOn(Schedulers.computation());
}
private static <T> Observable<T> makeObservable(final Callable<T> func) {
return Observable.create(
new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(func.call());
} catch (Exception ex) {
subscriber.onError(ex);
}
}
});
}
}
Однако мой по полной никогда не запускается. Я проверил onNext, просматривая результаты iApp и выводя одно из полей, чтобы видеть, что данные собираются, однако моя функция CompareLists никогда не запускается.
Может ли кто-нибудь объяснить мой недосмотр?
1 ответ
Решение
Ну, это было стыдно!
private static <T> Observable<T> makeObservable(final Callable<T> func) {
return Observable.create(
new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(func.call());
subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}
});
}