Статус Asynctask && отменить эквивалент в RxJava2 Observable?

Я пытаюсь изучить RxJava2 и преобразовываю свои AsyncTasks в Observables.

У меня есть следующий фрагмент кода, который я пытаюсь преобразовать.

if(asyncTask.getStatus() == AsyncTask.Status.RUNNING){
    asyncTask.cancel();
}
asyncTask = new CustomTask();  
asyncTask.execute(input);

Я попытался воссоздать следующее с помощью одноразовых принадлежностей.

Disposable currentTask;
PublishSubject input = PublishSubject.create();

Для каждого входа

if(currentTask != null) currentTask.dispose();

currentTask = input
    .map(// Network calls
         // returns CustomObject)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> {
               // do work with result
               }, throwable -> Log.e(TAG, throwable.toString()));

Тем не мение, currentTask всегда ноль. Зачем? Это неправильный способ сделать это?

1 ответ

Вы используете Disposable правильно, но я могу только предположить, что вы где-то запутались с предметом. Субъектами в rx могут быть как издатели, так и подписчики... и не обязательно ждать, пока subscribe(...) начать излучать предметы. По этой причине я бы не предложил заменить ваш AsyncTask с любым видом Subject,

Вы можете получить подобное, более детерминированное поведение, делая это, хотя:

Observable<CustomObject> networkObservable =
            Observable.create(emitter ->
                    {
                        try {
                            CustomObject object = doNetworking();
                            emitter.onNext(object);
                            emitter.onComplete();
                        } catch (Exception e) {
                            emitter.onError(e);
                        }
                    }
            );

if(currentTask != null) currentTask.dispose();

currentTask = networkObservable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // this next subscribe is similar to AsyncTask.execute() as it starts the stream
        .subscribe(result -> {
            // do work with result
            }, throwable -> Log.e(TAG, throwable.toString()));

Кроме того, рассмотреть вопрос о SerialDisposable и вам не нужно делать эти проверки на ноль / распоряжение

SerialDisposable serialDisposable = new SerialDisposable();

Атомно: установить следующий одноразовый на этот контейнер и утилизировать предыдущий (если есть) или утилизировать следующий, если контейнер был утилизирован.

Disposable disposable = networkObservable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);

serialDisposable.set(disposable); // this will call dispose on the previous task if it exists
Другие вопросы по тегам