Оператор RxJava похож на amb, но только с действительными результатами

Я хочу автоматически найти устройство в Android приложение. Поэтому я хотел бы сделать два звонка, сетевой звонок с Retrofitи не сетевой вызов с использованием специального SDK, в то же время, чтобы выяснить, какое устройство использует пользователь. Приложение должно выбрать первый результат, который предоставляет действительное значение.

я использую RxJava и попробовал это с оператором amb как это:

public Observable<LoginResponse> detectDevice(String username, String pwd) {
    return Observable.amb(device1.login(username, pwd), device2.login(username, pwd));
}

Кажется, это работает нормально, если устройство, которое нужно обнаружить, это устройство1, которое использует сетевой вызов. Но если это устройство2, которое должно быть обнаружено, он вернется onError(), поскольку device1.login() заканчивается быстрее и amb занимает первое onNext() или же onError(), Даже если device2.login() предоставляет действительный результат, он не будет принят во внимание, потому что он слишком медленный.

У меня вопрос: есть ли лучший способ получить только правильный ответ или другой оператор? Я не хочу использовать zipпотому что в будущем может появиться больше устройств, и я не хочу позволить пользователю ждать, пока запрос на вход в систему не будет завершен для каждого устройства.

3 ответа

Решение

Сообщение JohnWowUs вдохновило меня на использование materialize, но немного по-другому, это то, что я пошел с:

public Observable<LoginResponse> detectDevices(String username, String password) {

    Observable<Notification<LoginResponse>> deviceOneObservable = device1.login(username, password).timeout(2, TimeUnit.SECDONDS).materialize().take(1);
    Observable<Notification<LoginResponse>> deviceTwoObservable = device2.login(username, password).timeout(2, TimeUnit.SECONDS).materialize().take(1);

    return Observable
            .zip(deviceOneObservable, deviceTwoObservable, new Func2<Notification<LoginResponse>, Notification<LoginResponse>, Pair<Notification<LoginResponse>, Notification<LoginResponse>>>() {
                @Override
                public Pair<Notification<LoginResponse>, Notification<LoginResponse>> call(Notification<LoginResponse> loginResponseNotification, Notification<LoginResponse> loginResponseNotification2) {
                    return Pair.create(loginResponseNotification, loginResponseNotification2);
                }
            })
            .flatMap(new Func1<Pair<Notification<LoginResponse>, Notification<LoginResponse>>, Observable<LoginResponse>>() {
                @Override
                public Observable<LoginResponse> call(Pair<Notification<LoginResponse>, Notification<LoginResponse>> notificationNotificationPair) {

                    final Notification<LoginResponse> deviceOneNotification = notificationNotificationPair.first;
                    final Notification<LoginResponse> deviceTwoNotification = notificationNotificationPair.second;

                    //treat 4 different cases of device detection
                    //case1: no compatible device was detected
                    if (deviceOneNotification.isOnError() && deviceTwoNotification.isOnError()) {
                        return Observable.just(new LoginResponse(DeviceType.UNKNOWN));

                        //case2: device1 was detected
                    } else if (deviceOneNotification.isOnNext()) {
                        return Observable.just(new LoginResponse(DeviceType.DEVICE_ONE));

                       //case3:  device2 was detected
                    } else if (deviceTwoNotification.isOnNext()) {
                        return Observable.just(new LoginResponse(DeviceType.DEVICE_TWO));
                       //case4:  error has occurred
                    } else {
                        ... //error handling
                    }
                }
            }
}

Вы можете попробовать использовать materialise оператор на любой выход из login функция и посмотреть, если это ошибка, а затем использовать takeUntil Оператор молча отбрасывает любые ошибки:

List<Observable<LoginResponse>> logins = new ArrayList<>();
logins.add(device1.login(username, pwd));
logins.add(device2.login(username, pwd));
Observable.from(logins)
    .materialize()
    .takeUntil((observableNotification) -> {
        return !observableNotification.isOnError();
    }).dematerialize();

Улучшение было бы добавить timeout если нет ответа от какого-либо login функция бросить Throwable в Subscriber,

Вы могли бы попробовать

Observable.mergeDelayError(device1.login(username, pwd), device2.login(username, pwd)).first()
Другие вопросы по тегам