Обработка оптимистичной блокировки с использованием rxjs

Я пытаюсь обработать оптимистическую блокировку в нашем Angular-приложении, используя rxjs, но я сталкиваюсь с некоторыми проблемами, которые я не могу объяснить.

На моей странице есть подробная информация о кредитной заметке, а также некоторые подробные строки. Когда мы выполняем вызовы API для изменения этой кредитки, мы всегда возвращаем полную кредитку, БЕЗ строк. Строки извлекаются только в исходном распознавателе маршрута.

Я объявил эти наблюдаемые и предметы:

    creditNote$: Observable<CreditNoteDetailWithLines>;
    reloadSubject$: Subject<CreditNoteDetail> = new Subject<CreditNoteDetail>();
    deleteCreditNoteLineSubject$: Subject<any> = new Subject<any>();

И мой метод ngOnInit выглядит так:

    ngOnInit(): void {
        // initial creditnote WITH lines
        let routeDataObservable$: Observable<CreditNoteDetailWithLines> = this.activatedRoute.data.pipe(map(data => data.creditNote));

        // subject contains creditnote WITHOUT lines
        let reloadObservable$: Observable<CreditNoteDetailWithLines> = this.reloadSubject$.pipe(
            // get initial creditnote with lines
            withLatestFrom(routeDataObservable$),
            // copy lines onto fresh creditnote
            map((values: [CreditNoteDetail, CreditNoteDetailWithLines]) => {
                let refreshedData: CreditNoteDetail = values[0];
                let initialData: CreditNoteDetailWithLines = values[1];
                let updated = { ...refreshedData, creditNoteLines: initialData.creditNoteLines };
                console.log(`emitting version ${updated.version}`);
                return updated;
            })
        );

        this.creditNote$ = routeDataObservable$.pipe(merge(reloadObservable$));

        this.deleteCreditNoteLineSubject$
            .pipe(
                concatMap((line: any) => {
                    return Observable.of(line).pipe(
                        tap(x => {
                            console.log(`start execution inside concatmap`);
                        }),
                        withLatestFrom(this.creditNote$),
                        mergeMap((values: [any, CreditNoteDetailWithLines]) => {
                            let line = values[0];
                            let creditNote = values[1];
                            console.log(`Using version: ${creditNote.version}`);
                            return this.creditNoteService.deleteCreditNoteLine(creditNote.id, creditNote.version, line.id);
                        }),
                        tap((updatedCreditNote: CreditNoteDetail) => {
                            console.log(`Updating to version: ${updatedCreditNote.version}`);
                            this.reloadSubject$.next(updatedCreditNote);
                        })
                    );
                })
            )
            .subscribe((creditNote: CreditNoteDetail) => {});
    }

Я ожидаю, что, поскольку я обернул наблюдаемое в оператор concatMap, это всегда будет выполняться последовательно. Поскольку это последовательно, я должен иметь возможность получить поле "версия" последних обновленных данных (это поле версии используется для оптимистической блокировки). Я использую withLatestFrom для получения самых последних версий данных.

Результатом этого является то, что первоначальный вызов работает, но второй вызов не выполняется, потому что у него нет обновленной версии.

Консольный вывод:

start execution inside concatmap
Using version: 181
Updating to version: 182
emitting version 182
emitting version 182
start execution inside concatMap
Usign version: 181

Вывод на консоль говорит мне, что порядок выполнения точно такой, как я ожидал, но все же я не получаю последние данные. Кто-нибудь может объяснить это?

У меня действительно есть обходной путь, но я не доволен им, и я не совсем понимаю, почему это работает:

Если я создаю дополнительный BehaviorSubject:

creditNoteSubject$: BehaviorSubject<CreditNoteDetailWithLines> = new BehaviorSubject<CreditNoteDetailWithLines>(null);

А затем подпишитесь на creditNote $ observable и передайте все выбросы в BehaviorSubject:

routeDataObservable$.pipe(merge(reloadObservable$)).subscribe(cn => this.creditNoteSubject$.next(cn));

Тогда все работает как положено... но почему? Я понимаю разницу между Subject и BehaviorSubject, но это не говорит мне, почему я получаю устаревшую информацию.

Редактировать:

После некоторых поисков и поисков я обнаружил, что это, кажется, очень распространенная вещь в rxjs и исправлена ​​с помощью операторов publishReplay и refCount. Окончательный код в итоге выглядел так:

Объявления:

public creditNote$: Observable<CreditNoteDetailWithLines>;

private deleteCreditNoteLineSubject$: Subject<any> = new Subject<any>();
private reloadCreditNoteSubject$: Subject<ReloadCreditNoteOperation> = new Subject<ReloadCreditNoteOperation>();

private subscriptions: Subscription[] = [];

Код:

    this.creditNote$ = this.reloadCreditNoteSubject$.pipe(
        scan((acc: CreditNoteDetailWithLines, value: ReloadCreditNoteOperation, index: number) => {
            if (value.action == 'creditNote') {
                return { ...value.payload, creditNoteLines: acc.creditNoteLines };
            } else if (value.action == 'creditNoteWithLines') {
                return <CreditNoteDetailWithLines>value.payload;
            }
        }, initialCreditNote),
        startWith(initialCreditNote),
        publishReplay(1),
        refCount()
    );

    this.subscriptions.push(
        this.deleteCreditNoteLineSubject$
            .pipe(
                // use concatMap to force sequential execution
                // -> we need the updated version of the previous api call
                concatMap((line: any) => {
                    return Observable.of(line).pipe(
                        withLatestFrom(this.creditNote$),
                        mergeMap((values: [any, CreditNoteDetailWithLines]) => {
                            let line = values[0];
                            let creditNote = values[1];
                            return this.creditNoteService.deleteCreditNoteLine(creditNote.id, creditNote.version, line.id);
                        }),
                        withLatestFrom(this.creditNote$),
                        tap(([updatedCreditNoteDetail, creditNoteWithLines]) => {
                            let updated = {
                                ...updatedCreditNoteDetail,
                                creditNoteLines: creditNoteWithLines.creditNoteLines.filter(l => l.id != line.id)
                            };
                            this.updateCreditNoteWithLines(updated);
                        })
                    );
                })
            )
            .subscribe(values => {})
    );

На самом деле довольно забавно видеть, что эти запросы выполняются последовательно с использованием оператора concatMap.

0 ответов

Другие вопросы по тегам