Rx Java Android: как преобразовать этот блок обратного вызова в Observer

Я пытаюсь загрузить файл с помощью Amazon S3 Android SDK. Я немного использовал RX Java, но я не уверен, как преобразовать этот метод в метод, который возвращает Observable, потому что я хочу связать результат этого метода с другим вызовом Observable. Это смущает меня, я полагаю, из-за того, что это не возвращается сразу и не может вернуться, пока не изменится OnError или OnState. Как мне справиться с этими ситуациями в стиле RX?

public void uploadFile(TransferObserver transferObserver){

    transferObserver.setTransferListener(new TransferListener() {
        @Override
        public void onStateChanged(int id, TransferState state) {

        }

        @Override
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

        }

        @Override
        public void onError(int id, Exception ex) {

        }
  });

}

Если бы кто-то мог ответить с RX Java 2 и лямбдами, это было бы замечательно, потому что я просто продолжаю рассказывать об этом

2 ответа

Как правило, это правильный подход для соединения асинхронного / обратного вызова с реактивным, но с использованием Observable.create() сейчас обескуражен, так как требует передовых знаний, чтобы сделать это правильно.
Вы должны использовать более свежий метод создания Observable.fromEmitter(), который будет выглядеть точно так же:

    return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
        @Override
        public void call(Emitter<Integer> emitter) {

            transObs.setTransferListener(new TransferListener() {
                @Override
                public void onStateChanged(int id, TransferState state) {
                    if (state == TransferState.COMPLETED)
                        emitter.onCompleted();
                }

                @Override
                public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                }

                @Override
                public void onError(int id, Exception ex) {
                    emitter.onError(ex);
                }
            });
            emitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    // Deal with unsubscription:
                    // 1. unregister the listener to avoid memory leak
                    // 2. cancel the upload 
                }
            });
        }
    }, Emitter.BackpressureMode.DROP);

Здесь было добавлено следующее: работа с аннулированием подписки: отмена загрузки и отмена регистрации во избежание утечек памяти, а также указание стратегии противодавления.
Вы можете прочитать больше здесь.

Дополнительные примечания:

  • если вы заинтересованы в прогрессе, вы можете вызвать onNext() с прогрессом в onProgressChanged() и преобразовать наблюдаемое в Observable<Integer>,
  • если нет, вы можете рассмотреть возможность использования Completable который можно наблюдать без onNext() выбросы, но только onCompleted() это может удовлетворить ваш случай, если вы не заинтересованы в показателях прогресса.

@Yosriz Я не смог заставить ваш код скомпилироваться, но вы мне немного помогли, так что, исходя из вашего ответа, вот что у меня сейчас есть:

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
            @Override
            public void call(AsyncEmitter<Integer> emitter) {

                transObs.setTransferListener(new TransferListener() {
                    @Override
                    public void onStateChanged(int id, TransferState state) {
                        if (state == TransferState.COMPLETED)
                            emitter.onCompleted();
                    }

                    @Override
                    public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                    }

                    @Override
                    public void onError(int id, Exception ex) {
                        emitter.onError(ex);
                    }
                });

                emitter.setCancellation(new AsyncEmitter.Cancellable() {
                    @Override
                    public void cancel() throws Exception {

                        transObs.cleanTransferListener();
                    }
                });
            }
        }, AsyncEmitter.BackpressureMode.DROP);
Другие вопросы по тегам