Обработка оптимистичной блокировки с использованием rxjs
Я пытаюсь обработать оптимистическую блокировку в нашем Angular-приложении, используя rxjs, но я сталкиваюсь с некоторыми проблемами, которые я не могу объяснить.
На моей странице есть подробная информация о кредитной заметке, а также некоторые подробные строки. Когда мы выполняем вызовы API для изменения этой кредитки, мы всегда возвращаем полную кредитку, БЕЗ строк. Строки извлекаются только в исходном распознавателе маршрута.
Я объявил эти наблюдаемые и предметы:
creditNote$: Observable<CreditNoteDetailWithLines>;
reloadSubject$: Subject<CreditNoteDetail> = new Subject<CreditNoteDetail>();
deleteCreditNoteLineSubject$: Subject<any> = new Subject<any>();
И мой метод ngOnInit выглядит так:
ngOnInit(): void {
// initial creditnote WITH lines
let routeDataObservable$: Observable<CreditNoteDetailWithLines> = this.activatedRoute.data.pipe(map(data => data.creditNote));
// subject contains creditnote WITHOUT lines
let reloadObservable$: Observable<CreditNoteDetailWithLines> = this.reloadSubject$.pipe(
// get initial creditnote with lines
withLatestFrom(routeDataObservable$),
// copy lines onto fresh creditnote
map((values: [CreditNoteDetail, CreditNoteDetailWithLines]) => {
let refreshedData: CreditNoteDetail = values[0];
let initialData: CreditNoteDetailWithLines = values[1];
let updated = { ...refreshedData, creditNoteLines: initialData.creditNoteLines };
console.log(`emitting version ${updated.version}`);
return updated;
})
);
this.creditNote$ = routeDataObservable$.pipe(merge(reloadObservable$));
this.deleteCreditNoteLineSubject$
.pipe(
concatMap((line: any) => {
return Observable.of(line).pipe(
tap(x => {
console.log(`start execution inside concatmap`);
}),
withLatestFrom(this.creditNote$),
mergeMap((values: [any, CreditNoteDetailWithLines]) => {
let line = values[0];
let creditNote = values[1];
console.log(`Using version: ${creditNote.version}`);
return this.creditNoteService.deleteCreditNoteLine(creditNote.id, creditNote.version, line.id);
}),
tap((updatedCreditNote: CreditNoteDetail) => {
console.log(`Updating to version: ${updatedCreditNote.version}`);
this.reloadSubject$.next(updatedCreditNote);
})
);
})
)
.subscribe((creditNote: CreditNoteDetail) => {});
}
Я ожидаю, что, поскольку я обернул наблюдаемое в оператор concatMap, это всегда будет выполняться последовательно. Поскольку это последовательно, я должен иметь возможность получить поле "версия" последних обновленных данных (это поле версии используется для оптимистической блокировки). Я использую withLatestFrom для получения самых последних версий данных.
Результатом этого является то, что первоначальный вызов работает, но второй вызов не выполняется, потому что у него нет обновленной версии.
Консольный вывод:
start execution inside concatmap
Using version: 181
Updating to version: 182
emitting version 182
emitting version 182
start execution inside concatMap
Usign version: 181
Вывод на консоль говорит мне, что порядок выполнения точно такой, как я ожидал, но все же я не получаю последние данные. Кто-нибудь может объяснить это?
У меня действительно есть обходной путь, но я не доволен им, и я не совсем понимаю, почему это работает:
Если я создаю дополнительный BehaviorSubject:
creditNoteSubject$: BehaviorSubject<CreditNoteDetailWithLines> = new BehaviorSubject<CreditNoteDetailWithLines>(null);
А затем подпишитесь на creditNote $ observable и передайте все выбросы в BehaviorSubject:
routeDataObservable$.pipe(merge(reloadObservable$)).subscribe(cn => this.creditNoteSubject$.next(cn));
Тогда все работает как положено... но почему? Я понимаю разницу между Subject и BehaviorSubject, но это не говорит мне, почему я получаю устаревшую информацию.
Редактировать:
После некоторых поисков и поисков я обнаружил, что это, кажется, очень распространенная вещь в rxjs и исправлена с помощью операторов publishReplay и refCount. Окончательный код в итоге выглядел так:
Объявления:
public creditNote$: Observable<CreditNoteDetailWithLines>;
private deleteCreditNoteLineSubject$: Subject<any> = new Subject<any>();
private reloadCreditNoteSubject$: Subject<ReloadCreditNoteOperation> = new Subject<ReloadCreditNoteOperation>();
private subscriptions: Subscription[] = [];
Код:
this.creditNote$ = this.reloadCreditNoteSubject$.pipe(
scan((acc: CreditNoteDetailWithLines, value: ReloadCreditNoteOperation, index: number) => {
if (value.action == 'creditNote') {
return { ...value.payload, creditNoteLines: acc.creditNoteLines };
} else if (value.action == 'creditNoteWithLines') {
return <CreditNoteDetailWithLines>value.payload;
}
}, initialCreditNote),
startWith(initialCreditNote),
publishReplay(1),
refCount()
);
this.subscriptions.push(
this.deleteCreditNoteLineSubject$
.pipe(
// use concatMap to force sequential execution
// -> we need the updated version of the previous api call
concatMap((line: any) => {
return Observable.of(line).pipe(
withLatestFrom(this.creditNote$),
mergeMap((values: [any, CreditNoteDetailWithLines]) => {
let line = values[0];
let creditNote = values[1];
return this.creditNoteService.deleteCreditNoteLine(creditNote.id, creditNote.version, line.id);
}),
withLatestFrom(this.creditNote$),
tap(([updatedCreditNoteDetail, creditNoteWithLines]) => {
let updated = {
...updatedCreditNoteDetail,
creditNoteLines: creditNoteWithLines.creditNoteLines.filter(l => l.id != line.id)
};
this.updateCreditNoteWithLines(updated);
})
);
})
)
.subscribe(values => {})
);
На самом деле довольно забавно видеть, что эти запросы выполняются последовательно с использованием оператора concatMap.