RxJava - flatmap vs concatMap - почему порядок подписки такой же?

В соответствии с этим потоком conCatMap и flatmap отличаются только порядком, в котором отправляются элементы. Поэтому я сделал тест и создал простой поток целых чисел и хотел посмотреть, в каком порядке они будут выбрасываться. Я сделал небольшую заметку, которая будет принимать числа в диапазоне от 1 до 5 и умножать их на два. Легко.

Вот код с плоской картой:

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from flatMap:"+integer);
        }
    });

и точно такой же код, используя concatMap:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from concatmap:"+integer);
        }
    });

когда я увидел распечатки в журналах, порядок был одинаков для обоих, почему? Я думал, только concatMap сохранит порядок?

1 ответ

Решение

То, что вы видите, является совпадением. Каждый раз, когда ваш flatMap возвращает значение, которое он делает, в том же потоке, что и предыдущий.

Я изменил ваш пример, чтобы воспользоваться многопоточностью:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .flatMap(integer -> Observable.just(integer)
                .observeOn(Schedulers.computation())
                .flatMap(i -> {
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                        return Observable.just(2 * i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return Observable.error(e);
                    }
                }))
        .subscribe(System.out::println,
                Throwable::printStackTrace,
                () -> System.out.println("onCompleted"));

Я задерживаю каждого 2 * i значение случайной задержкой, чтобы вызвать другой порядок. Также я добавил observeOn(Schedulers.computation()) до этого так следующий оператор (flatMap) работает в пуле потоков вычислений - это делает магию многопоточности.

Вот вывод, который я получаю для моего примера (на Android):

I/System.out: 6
I/System.out: 4
I/System.out: 12
I/System.out: 14
I/System.out: 8
I/System.out: 2
I/System.out: 16
I/System.out: 20
I/System.out: 10
I/System.out: 18
I/System.out: onCompleted

Если я заменю flatMap после just с concatMapтогда я получаю правильно упорядоченный вывод.

Есть замечательный пост Томаса Нилда с надлежащим объяснением.

Другие вопросы по тегам