¿Как именно работает оператор mergeMap и в каких случаях он используется?
Перед тем, как приехать сюда, я прочитал официальную документацию Rxjs и некоторых других страниц, но мне все еще не ясно. Я понял следующее:
Он используется для "объединения" двух наблюдаемых и, таким образом, получения единственного наблюдаемого в результате. Я также видел, что он используется для "сглаживания" наблюдаемого (я тоже не очень понимаю).
Теперь... У меня есть дни, когда я пытаюсь запрограммировать реестр пользователей с помощью Angular и Node.js с Express, и я нашел небольшое руководство, которое я решил использовать, и в нем есть этот код:
import { Injectable, Injector } from '@angular/core';
import { HttpClient, HttpInterceptor, HttpRequest, HttpHandler, HttpEvent, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError } from 'rxjs';
import { catchError, retry, mergeMap } from 'rxjs/operators'
import { AuthenticationService } from './authentication.service';
@Injectable({
providedIn: 'root'
})
export class AppInterceptor implements HttpInterceptor {
constructor(private injector: Injector) { }
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
let accessToken = "", refreshToken = ""
const tokens = JSON.parse(sessionStorage.getItem("tokens"))
if (tokens) {
accessToken = tokens.accessToken
refreshToken = tokens.refreshToken
}
let clonHttp: HttpRequest<any>
clonHttp = tokens ? req.clone({ headers: req.headers.append("Authorization", `Bearer ${accessToken}`) }) : req
let auth = this.injector.get(AuthenticationService);
return next.handle(clonHttp)
.pipe(
catchError((error: HttpErrorResponse) => {
if (error.error instanceof ErrorEvent) {
console.log("error event")
} else if (error.status == 401) {
return auth.getNewAccessToken(refreshToken)
.pipe(
retry(3),
mergeMap(
(response: any) => {
tokens.accessToken = response.accessToken
sessionStorage.setItem("tokens", JSON.stringify(tokens))
clonHttp = req.clone({ headers: req.headers.append("Authorization", `Bearer ${response.accessToken}`) })
return next.handle(clonHttp)
}
)
)
} else if (error.status == 409) {
return throwError("User not logged")
} else {
if (error.error && error.error.message) {
return throwError(error.error.message)
} else {
return throwError("Check your connection")
}
}
})
)
}
}
Если вы видите, когда вы используете оператор MergeMap, они передают вам только ответ (единственное наблюдаемое) или, по крайней мере, то, что я вижу. Я пытаюсь сказать, что я не вижу, чтобы они использовали его с 2 наблюдаемыми объектами или смешивали 2 наблюдаемых объекта, что я читал в их официальной документации, фактически, в тех примерах, которые они показывают, они всегда используют это с двумя наблюдаемыми.
Честно говоря, мне было слишком сложно понять этот оператор, если бы кто-то мог помочь мне понять его простым способом, я был бы чрезвычайно благодарен, помимо понимания его использования в том коде, который я показываю ранее. Приветствую заранее. Спасибо!
3 ответа
mergeMap
, как и многие другие так называемые операторы отображения высшего порядка, поддерживает одну или несколько внутренних наблюдаемых.
Внутренние наблюдаемый создаются с внешней стоимостью и предоставленной функцией. Внешнее значение по существу только значение, полученное от источника. Например:
of(1, 2, 3).pipe(
mergeMap((outerValue, index) => /* ... return an observable ... */)
).subscribe(); // `outerValue`: 1, 2, 3 (separately)
Когда приходит внешнее значение, создается новая внутренняя наблюдаемая. Думаю, лучший способ понять это - взглянуть на исходный код:
// `value` - the `outerValue`
protected _next(value: T): void {
if (this.active < this.concurrent) {
this._tryNext(value);
} else {
this.buffer.push(value);
}
}
protected _tryNext(value: T) {
let result: ObservableInput<R>;
const index = this.index++;
try {
// Create the inner observable based on the `outerValue` and the provided function (`this.project`)
// `mergeMap(project)`
result = this.project(value, index);
} catch (err) {
this.destination.error(err);
return;
}
this.active++;
// Subscribe to the inner observable
this._innerSub(result, value, index);
}
Пожалуйста, пока не обращайте внимания concurrent
а также buffer
, посмотрим на них чуть позже.
Итак, что происходит, когда излучается внутреннее наблюдаемое? Прежде чем идти дальше, стоит отметить, что, хотя это очевидно, внутренний наблюдаемым требует к внутреннему абоненту. Мы видим это в_innerSub
метод сверху:
private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
// This is where the subscription takes place
subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
}
Когда излучается внутреннее наблюдаемое, notifyNext
метод будет вызываться:
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}
Где пункт назначения указывает на следующего абонента в цепочке. Например, это может быть так:
of(1)
.pipe(
mergeMap(/* ... */)
)
.subscribe({} /* <- this is the `destination` for `mergeMap` */)
Это будет более подробно объяснено ниже в разделе Что насчет следующего подписчика в цепочке.
Итак, что это значит to mix 2 observables
?
Посмотрим на этот пример:
of(2, 3, 1)
.pipe(
mergeMap(outerValue => timer(outerValue).pipe(mapTo(outerValue)))
)
.subscribe(console.log)
/* 1 \n 2 \n 3 */
когда 2
прибывает, mergeMap
подпишется на внутреннее наблюдаемое, которое будет излучать в 200
РС. Это асинхронное действие, но обратите внимание, что внешние значения (2, 3, 1) поступают синхронно. Следующий,3
прибывает и создаст внутреннюю обс. что будет излучать в300
РС. Поскольку текущий сценарий еще не завершен, очередь обратного вызова еще не рассматривается. Сейчас1
прибывает, и создаст внутреннюю обс. что будет излучать в100
РС.
mergeMap
теперь имеет 3 внутренних наблюдаемых и будет передавать внутреннее значение того, что излучает внутреннее наблюдаемое.
Как и ожидалось, получаем1
, 2
, 3
.
Так вот что mergeMap
делает. Смешивание наблюдаемых можно представить следующим образом: если приходит внешнее значение и внутреннее наблюдаемое уже создано, тогдаmergeMap
просто говорит: "нет проблем, я просто создам новый внутренний obs. и подпишусь на него".
Как насчет concurrent
а также buffer
mergeMap
можно дать второй аргумент, concurrent
который указывает, сколько внутренних наблюдаемых следует обрабатывать одновременно. Это количество активных внутренних наблюдаемых отслеживается с помощьюactive
свойство.
Как видно в _next
метод, если active >= concurrent
, то outerValues
будет добавлен в buffer
, которая представляет собой очередь (FIFO
).
Затем, когда одна активная внутренняя наблюдаемая завершается,mergeMap
возьмет самое старое значение из значения и создаст из него внутреннее наблюдаемое, используя предоставленную функцию:
// Called when an inner observable completes
notifyComplete(innerSub: Subscription): void {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift()!); // Create a new inner obs. with the oldest buffered value
} else if (this.active === 0 && this.hasCompleted) {
this.destination.complete();
}
}
Имея это в виду, concatMap(project)
просто mergeMap(project, 1)
.
Итак, если у вас есть:
of(2, 3, 1)
.pipe(
mergeMap(outerValue => timer(outerValue * 100).pipe(mapTo(outerValue)), 1)
)
.subscribe(console.log)
это будет зарегистрировано:
2 \n 3 \n 1
.
А как насчет следующего абонента в цепочке?
Операторы - это функции, которые возвращают другую функцию, которая принимает наблюдаемый объект как единственный параметр и возвращает другое наблюдаемое. Когда поток подписывается, каждая наблюдаемая, возвращаемая оператором, будет иметь своего собственного подписчика.
Всех этих подписчиков можно увидеть в виде связанного списка. Например:
// S{n} -> Subscriber `n`, where `n` depends on the order in which the subscribers are created
of(/* ... */)
.pipe(
operatorA(), // S{4}
operatorB(), // S{3}
operatorC(), // S{2}
).subscribe({ /* ... */ }) // S{1}; the observer is converted into a `Subscriber`
S{n}
является родителем (местом назначения)S{n+1}
, означающий, что S{1}
это пункт назначения S{2}
, S{2}
это пункт назначения S{3}
и так далее.
Неожиданные результаты
Сравните это:
of(2, 1, 0)
.pipe(
mergeMap(v => timer(v * 100).pipe(mapTo(v)))
).subscribe(console.log)
// 0 1 2
of(2, 1, 0)
.pipe(
mergeMap(v => timer(v).pipe(mapTo(v)))
).subscribe(console.log)
// 1 0 2
Согласно MDN:
Указанное время (или задержка) не является гарантированным временем выполнения, а скорее минимальным временем выполнения. Обратные вызовы, которые вы передаете этим функциям, не могут выполняться, пока стек в основном потоке не станет пустым.
Как следствие, такой код, как setTimeout(fn, 0), будет выполняться, как только стек будет пуст, а не сразу. Если вы выполните такой код, как setTimeout(fn, 0), но сразу после выполнения цикла, который насчитывает от 1 до 10 миллиардов, ваш обратный вызов будет выполнен через несколько секунд.
Этот раздел MDN также должен прояснить ситуацию.
Я бы сказал, что это зависит от среды, а не от RxJs.
Во втором фрагменте задержки идут последовательно, поэтому вы получаете неожиданные результаты. Если вы немного увеличите задержки, например:timer(v * 2)
, вы должны получить ожидаемое поведение.
Таким образом, карта слияния в основном используется для одновременного разрешения нескольких внутренних наблюдаемых, и когда разрешены все внутренние наблюдаемые, разрешится внешнее наблюдаемое. Надеюсь, это поможет.
Представьте, что вам нужно прочитать список id
s из некоторого асинхронного источника, будь то удаленная служба, БД, файл в вашей файловой системе.
Представьте, что вам нужно запустить асинхронный запрос для каждого id
чтобы узнать подробности.
Представьте, что вам нужно собрать все детали для каждого id
и заняться чем-нибудь еще.
В конечном итоге у вас будет начальный Obsersable, излучающий список, а затем набор Observable, созданный этим списком. Это было бы вы использовалиmergeMap
.
Код будет выглядеть так
mySourceObs = getIdListFromSomewhere();
myStream = mySourceObs.pipe(
// after you get the list of the ids from your service, you generate a new strem
// which emits all the values of the list via from operator
concatMap(listOfIds => from(listOfIds)),
// for each id you get the details
mergeMap(id => getDetails(id),
)
Если вы подпишетесь на myStream
вы получаете поток подробных данных, по одному для каждого id
исходного списка. Код был бы просто
myStream.subscribe(
detail => {
// do what you have to do with the details of an id
}
)
БОЛЬШЕ О КОДЕ, УТОЧНЕННОМ В ВОПРОСЕ
Мое понимание фрагмента кода с использованием mergeMap
следующее:
- вы получаете новый токен с
auth.getNewAccessToken
- Если что-то пойдет не так, повторите попытку 3 раза
- Когда вы получаете новый токен, вы делаете что-то, а затем клонируете что-то с
next.handle(clonHttp)
Ключевым моментом является то, что оба auth.getNewAccessToken
а также next.handle(clonHttp)
асинхронные вызовы, возвращающие Observable.
В этом случае вы хотите убедиться, что ПЕРВЫЙ вы получите ответ от auth.getNewAccessToken
И ТОЛЬКО ТОГДА звоните next.handle(clonHttp)
.
В этом случае лучший способ кодировать такую логику - использовать concatMap
что гарантирует, что второй Observable будет связан с успешным завершением первого.
mergeMap
а также switchMap
также может работать в этом сценарии, поскольку auth.getNewAccessToken
излучает только ОДИН РАЗ и затем завершается, но правильная семантика задается concatMap
(который, кстати, совпадает с mergeMap
с параллелизмом, равным 1, но это уже другая история).