Rx Java Android: как преобразовать этот блок обратного вызова в Observer
Я пытаюсь загрузить файл с помощью Amazon S3 Android SDK. Я немного использовал RX Java, но я не уверен, как преобразовать этот метод в метод, который возвращает Observable, потому что я хочу связать результат этого метода с другим вызовом Observable. Это смущает меня, я полагаю, из-за того, что это не возвращается сразу и не может вернуться, пока не изменится OnError или OnState. Как мне справиться с этими ситуациями в стиле RX?
public void uploadFile(TransferObserver transferObserver){
transferObserver.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
}
});
}
Если бы кто-то мог ответить с RX Java 2 и лямбдами, это было бы замечательно, потому что я просто продолжаю рассказывать об этом
2 ответа
Как правило, это правильный подход для соединения асинхронного / обратного вызова с реактивным, но с использованием Observable.create()
сейчас обескуражен, так как требует передовых знаний, чтобы сделать это правильно.
Вы должны использовать более свежий метод создания Observable.fromEmitter()
, который будет выглядеть точно так же:
return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
@Override
public void call(Emitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
// Deal with unsubscription:
// 1. unregister the listener to avoid memory leak
// 2. cancel the upload
}
});
}
}, Emitter.BackpressureMode.DROP);
Здесь было добавлено следующее: работа с аннулированием подписки: отмена загрузки и отмена регистрации во избежание утечек памяти, а также указание стратегии противодавления.
Вы можете прочитать больше здесь.
Дополнительные примечания:
- если вы заинтересованы в прогрессе, вы можете вызвать onNext() с прогрессом в
onProgressChanged()
и преобразовать наблюдаемое вObservable<Integer>
, - если нет, вы можете рассмотреть возможность использования
Completable
который можно наблюдать безonNext()
выбросы, но толькоonCompleted()
это может удовлетворить ваш случай, если вы не заинтересованы в показателях прогресса.
@Yosriz Я не смог заставить ваш код скомпилироваться, но вы мне немного помогли, так что, исходя из вашего ответа, вот что у меня сейчас есть:
return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
@Override
public void call(AsyncEmitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new AsyncEmitter.Cancellable() {
@Override
public void cancel() throws Exception {
transObs.cleanTransferListener();
}
});
}
}, AsyncEmitter.BackpressureMode.DROP);