В чем разница между concatMap и flatMap в RxJava

Кажется, что эти 2 функции очень похожи. Они имеют одинаковую подпись (принимая rx.functions.Func1<? super T, ? extends Observable<? extends R>> func), и их мраморные диаграммы выглядят точно так же. Не могу вставить здесь фотографии, но вот для concatMap, а вот для flatMap. Кажется, есть некоторая тонкая разница в описании Observableгде один произведен concatMap содержит элементы, полученные в результате конкатенации результирующих наблюдаемых, и тот, который создан flatMap содержит элементы, полученные в результате первого слияния результирующих Observables и выдачи результата этого слияния.

Однако эта тонкость совершенно неясна для меня. Может кто-нибудь дать лучшее объяснение этой разницы, и в идеале привести несколько примеров, иллюстрирующих эту разницу.

7 ответов

Решение


Как вы писали, эти две функции очень похожи, и тонкое различие заключается в том, как создается выход (после применения функции отображения).

Плоская карта использует оператор слияния, а concatMap использует оператор concat.

Как вы видите, упорядочена выходная последовательность concatMap - все элементы, испускаемые первой наблюдаемой, испускаются раньше, чем любой из элементов, испускаемых второй наблюдаемой,
в то время как выходная последовательность flatMap объединена - элементы, испускаемые объединенной наблюдаемой, могут появляться в любом порядке, независимо от того, из какого источника наблюдаемой они поступили.

Несмотря на то, что ответы здесь хорошие, было нелегко определить разницу без примера. Итак, я создал простой пример для этого:

@Test
public void flatMapVsConcatMap() throws Exception {
    System.out.println("******** Using flatMap() *********");
    Observable.range(1, 15)
            .flatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
            .subscribe(x -> System.out.print(x + " "));

    Thread.sleep(100);

    System.out.println("\n******** Using concatMap() *********");
    Observable.range(1, 15)
            .concatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
            .subscribe(x -> System.out.print(x + " "));

    Thread.sleep(100);
}

******** Использование flatMap () *********

1 2 3 4 5 6 7 9 8 11 13 15 10 12 14

******** Использование concatMap () *********

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

Как видно из результатов, результаты для flatMap неупорядочены в то время как для concatMap они.

Одно очень важное отличие: concatMap ожидает завершения текущей излучаемой наблюдаемой и flatMap не делает. flatMap пытается начать как можно больше. Проще говоря, вы не можете объединить что-то бесконечное. Просто убедитесь, что наблюдаемые вами выбросы concatMap может завершиться, иначе весь поток застрянет в ожидании завершения текущей наблюдаемой, чтобы объединить следующий.

Я нахожу пример в ответе, за который проголосовали большинство, не очень понятным, поэтому я публикую тот, который помог мне понять разницу между flatMap и concatMap.

FlatMap принимает выбросов от источника наблюдаемого, а затем создать новый наблюдаемым и объединить его в оригинальной цепи, в то время как concatMap Concat его исходной цепи.

Основное отличие состоит в том, что concatMap() будет последовательно объединять каждый отображаемый Observable и запускать его по одному. Он перейдет к следующему Observable только тогда, когда текущий вызовет onComplete().

Вот пример flatMap:

private void flatMapVsConcatMap() throws InterruptedException {
    Observable.just(5, 2, 4, 1)
            .flatMap(
                    second ->
                            Observable.just("Emit delayed with " + second + " second")
                                    .delay(second, TimeUnit.SECONDS)
            )
            .subscribe(
                    System.out::println,
                    Throwable::printStackTrace
            );

    Thread.sleep(15_000);
}

Результатом будет:

Излучение с задержкой на 1 секунду
Излучение с задержкой на 2 секунды
Излучение с задержкой на 4 секунды
Излучение с задержкой на 5 секунд

Вот пример concatMap:

private void flatMapVsConcatMap() throws InterruptedException {
    Observable.just(5, 2, 4, 1)
            .concatMap(
                    second ->
                            Observable.just("Emit delayed with " + second + " second")
                                    .delay(second, TimeUnit.SECONDS)
            )
            .subscribe(
                    System.out::println,
                    Throwable::printStackTrace
            );

    Thread.sleep(15_000);
}

Результатом будет:

Излучение с задержкой на 5 секунд.
Излучение с задержкой на 2 секунды.
Излучение с задержкой на 4 секунды.
Излучение с задержкой на 1 секунду.

Обратите внимание на использование Thread.sleep(), поскольку delay() по умолчанию работает в планировщике вычислений.

Другие уже указали ответ, но в случае, если он не слишком очевиден, существует риск создания нежелательного параллелизма с помощью flatMap, если это нежелательно, вы можете использовать concatMap или перегрузку flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)

Во-первых, flatMap - это то же самое, что mergeMap в Rxjs. Так что это на одну путаницу меньше. Итак, есть две наблюдаемые..

1) o1: простой список предметов из (['Китти','Дональд','Бэтмен'])

2) process_o1(): process_o1() - это функция, которая принимает в качестве одного параметра "элемент" и что-то делает с ним, а также возвращает Observable, который по завершении выдает "done with [item]".

o1.pipe(mergeMap(item => process_o1(item))).subscribe(data => {
console.log(data);
});

Здесь мы увидим:- С Кити покончено.

сделано с Дональдом.

сделано с Бэтменом.

без всякой гарантии, что Китти появится раньше Дональда, а Дональд появится перед Бэтменом. Это потому, что, как только внешний наблюдаемый объект испускает элемент, подписывается внутренний наблюдаемый объект.

=== Но в случае concatMap:-

o1.pipe(concatMap(item => process_o1(item))).subscribe(data => {
console.log(data);
});

У нас есть гарантия следующей последовательности:-

с Кити покончено.

сделано с Дональдом.

сделано с Бэтменом.

Поскольку с оператором concatMap внутренний Observable не подписывается до возврата предыдущего внутреннего Observable.

Внешняя наблюдаемая может просто продолжить и выдать все свои значения, но concatMap будет следить за тем, чтобы она обрабатывала каждое из этих значений одно за другим и поддерживала порядок. Отсюда и название concatMap.

В сущности, если вы хотите поддерживать порядок выполнения вещей, вам следует использовать concatMap. Но если вас не волнует порядок, вы можете продолжить с mergeMap, который будет подписываться на все внутренние Observables сразу и продолжать выдавать значения по мере их возврата.

flatMap - merge - если создается новый элемент, он имеет приоритет

concatMap - concatenate - добавить в конец - испустить полную последовательность и только после этого (предыдущая была завершена) может испустить следующую последовательность

Другие вопросы по тегам