В чем разница между concatMap и flatMap в RxJava
Кажется, что эти 2 функции очень похожи. Они имеют одинаковую подпись (принимая rx.functions.Func1<? super T, ? extends Observable<? extends R>> func
), и их мраморные диаграммы выглядят точно так же. Не могу вставить здесь фотографии, но вот для concatMap, а вот для flatMap. Кажется, есть некоторая тонкая разница в описании Observable
где один произведен concatMap
содержит элементы, полученные в результате конкатенации результирующих наблюдаемых, и тот, который создан flatMap
содержит элементы, полученные в результате первого слияния результирующих Observables и выдачи результата этого слияния.
Однако эта тонкость совершенно неясна для меня. Может кто-нибудь дать лучшее объяснение этой разницы, и в идеале привести несколько примеров, иллюстрирующих эту разницу.
7 ответов
Как вы писали, эти две функции очень похожи, и тонкое различие заключается в том, как создается выход (после применения функции отображения).
Плоская карта использует оператор слияния, а concatMap использует оператор concat.
Как вы видите, упорядочена выходная последовательность concatMap - все элементы, испускаемые первой наблюдаемой, испускаются раньше, чем любой из элементов, испускаемых второй наблюдаемой,
в то время как выходная последовательность flatMap объединена - элементы, испускаемые объединенной наблюдаемой, могут появляться в любом порядке, независимо от того, из какого источника наблюдаемой они поступили.
Несмотря на то, что ответы здесь хорошие, было нелегко определить разницу без примера. Итак, я создал простой пример для этого:
@Test
public void flatMapVsConcatMap() throws Exception {
System.out.println("******** Using flatMap() *********");
Observable.range(1, 15)
.flatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
.subscribe(x -> System.out.print(x + " "));
Thread.sleep(100);
System.out.println("\n******** Using concatMap() *********");
Observable.range(1, 15)
.concatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
.subscribe(x -> System.out.print(x + " "));
Thread.sleep(100);
}
******** Использование flatMap () *********
1 2 3 4 5 6 7 9 8 11 13 15 10 12 14
******** Использование concatMap () *********
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Как видно из результатов, результаты для flatMap
неупорядочены в то время как для concatMap
они.
Одно очень важное отличие: concatMap
ожидает завершения текущей излучаемой наблюдаемой и flatMap
не делает. flatMap
пытается начать как можно больше. Проще говоря, вы не можете объединить что-то бесконечное. Просто убедитесь, что наблюдаемые вами выбросы concatMap
может завершиться, иначе весь поток застрянет в ожидании завершения текущей наблюдаемой, чтобы объединить следующий.
Я нахожу пример в ответе, за который проголосовали большинство, не очень понятным, поэтому я публикую тот, который помог мне понять разницу между flatMap и concatMap.
FlatMap принимает выбросов от источника наблюдаемого, а затем создать новый наблюдаемым и объединить его в оригинальной цепи, в то время как concatMap Concat его исходной цепи.
Основное отличие состоит в том, что concatMap() будет последовательно объединять каждый отображаемый Observable и запускать его по одному. Он перейдет к следующему Observable только тогда, когда текущий вызовет onComplete().
Вот пример flatMap:
private void flatMapVsConcatMap() throws InterruptedException {
Observable.just(5, 2, 4, 1)
.flatMap(
second ->
Observable.just("Emit delayed with " + second + " second")
.delay(second, TimeUnit.SECONDS)
)
.subscribe(
System.out::println,
Throwable::printStackTrace
);
Thread.sleep(15_000);
}
Результатом будет:
Излучение с задержкой на 1 секунду
Излучение с задержкой на 2 секунды
Излучение с задержкой на 4 секунды
Излучение с задержкой на 5 секунд
Вот пример concatMap:
private void flatMapVsConcatMap() throws InterruptedException {
Observable.just(5, 2, 4, 1)
.concatMap(
second ->
Observable.just("Emit delayed with " + second + " second")
.delay(second, TimeUnit.SECONDS)
)
.subscribe(
System.out::println,
Throwable::printStackTrace
);
Thread.sleep(15_000);
}
Результатом будет:
Излучение с задержкой на 5 секунд.
Излучение с задержкой на 2 секунды.
Излучение с задержкой на 4 секунды.
Излучение с задержкой на 1 секунду.
Обратите внимание на использование Thread.sleep(), поскольку delay() по умолчанию работает в планировщике вычислений.
Другие уже указали ответ, но в случае, если он не слишком очевиден, существует риск создания нежелательного параллелизма с помощью flatMap, если это нежелательно, вы можете использовать concatMap или перегрузку
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
Во-первых, flatMap - это то же самое, что mergeMap в Rxjs. Так что это на одну путаницу меньше. Итак, есть две наблюдаемые..
1) o1: простой список предметов из (['Китти','Дональд','Бэтмен'])
2) process_o1(): process_o1() - это функция, которая принимает в качестве одного параметра "элемент" и что-то делает с ним, а также возвращает Observable, который по завершении выдает "done with [item]".
o1.pipe(mergeMap(item => process_o1(item))).subscribe(data => {
console.log(data);
});
Здесь мы увидим:- С Кити покончено.
сделано с Дональдом.
сделано с Бэтменом.
без всякой гарантии, что Китти появится раньше Дональда, а Дональд появится перед Бэтменом. Это потому, что, как только внешний наблюдаемый объект испускает элемент, подписывается внутренний наблюдаемый объект.
=== Но в случае concatMap:-
o1.pipe(concatMap(item => process_o1(item))).subscribe(data => {
console.log(data);
});
У нас есть гарантия следующей последовательности:-
с Кити покончено.
сделано с Дональдом.
сделано с Бэтменом.
Поскольку с оператором concatMap внутренний Observable не подписывается до возврата предыдущего внутреннего Observable.
Внешняя наблюдаемая может просто продолжить и выдать все свои значения, но concatMap будет следить за тем, чтобы она обрабатывала каждое из этих значений одно за другим и поддерживала порядок. Отсюда и название concatMap.
В сущности, если вы хотите поддерживать порядок выполнения вещей, вам следует использовать concatMap. Но если вас не волнует порядок, вы можете продолжить с mergeMap, который будет подписываться на все внутренние Observables сразу и продолжать выдавать значения по мере их возврата.
flatMap
- merge - если создается новый элемент, он имеет приоритет
concatMap
- concatenate - добавить в конец - испустить полную последовательность и только после этого (предыдущая была завершена) может испустить следующую последовательность
![](/images/1656cebe09723d80b7ef75f1c290345e607df35e.png)