Статус Asynctask && отменить эквивалент в RxJava2 Observable?
Я пытаюсь изучить RxJava2 и преобразовываю свои AsyncTasks в Observables.
У меня есть следующий фрагмент кода, который я пытаюсь преобразовать.
if(asyncTask.getStatus() == AsyncTask.Status.RUNNING){
asyncTask.cancel();
}
asyncTask = new CustomTask();
asyncTask.execute(input);
Я попытался воссоздать следующее с помощью одноразовых принадлежностей.
Disposable currentTask;
PublishSubject input = PublishSubject.create();
Для каждого входа
if(currentTask != null) currentTask.dispose();
currentTask = input
.map(// Network calls
// returns CustomObject)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
// do work with result
}, throwable -> Log.e(TAG, throwable.toString()));
Тем не мение, currentTask
всегда ноль. Зачем? Это неправильный способ сделать это?
1 ответ
Вы используете Disposable
правильно, но я могу только предположить, что вы где-то запутались с предметом. Субъектами в rx могут быть как издатели, так и подписчики... и не обязательно ждать, пока subscribe(...)
начать излучать предметы. По этой причине я бы не предложил заменить ваш AsyncTask
с любым видом Subject
,
Вы можете получить подобное, более детерминированное поведение, делая это, хотя:
Observable<CustomObject> networkObservable =
Observable.create(emitter ->
{
try {
CustomObject object = doNetworking();
emitter.onNext(object);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
);
if(currentTask != null) currentTask.dispose();
currentTask = networkObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
// this next subscribe is similar to AsyncTask.execute() as it starts the stream
.subscribe(result -> {
// do work with result
}, throwable -> Log.e(TAG, throwable.toString()));
Кроме того, рассмотреть вопрос о SerialDisposable
и вам не нужно делать эти проверки на ноль / распоряжение
SerialDisposable serialDisposable = new SerialDisposable();
Атомно: установить следующий одноразовый на этот контейнер и утилизировать предыдущий (если есть) или утилизировать следующий, если контейнер был утилизирован.
Disposable disposable = networkObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
serialDisposable.set(disposable); // this will call dispose on the previous task if it exists