Управление устаревшими операциями чтения HTTP с помощью RxJS 5 (Angular 4)

В остальном у меня довольно стандартный SPA, построенный на Angular 4, взаимодействующий с REST API.

Всякий раз, когда приложение выполняет операцию записи в API (put/post/delete), я хочу прервать все операции чтения (get).

Я знаю, что могу добиться этого с помощью оператора takeUntil.

const write$ = new Subject<Date>();
public get(): Observable<Response> {
   return http.get().takeUntil(write$);
}

public update(): Observable<Response> {
   write$.next(new Date());
   return http.post();
}

То, что я хочу сделать (и я борюсь с этим), расширяет это так, чтобы:

  1. Прерванные операции чтения повторяются, если нет активных операций записи.
  2. Операции чтения, отправленные во время выполнения операции записи, буферизуются (вызывают ожидание), пока нет активных операций записи.

Случай использования, я думаю, довольно распространен. Всякий раз, когда пользователь изменяет сущность, я немедленно обновляю состояние этой сущности (в отличие от ожидания завершения операции записи). Это делает пользовательский интерфейс более отзывчивым. Операции чтения, выполняемые перед записью или начинающиеся после записи, но завершенные до завершения записи, приводят к тому, что пользовательский интерфейс переворачивает объект обратно в его старое состояние.

Важно, чтобы механизм прерывания / буферизации / ожидания при операциях чтения был без потерь.

***** ОБНОВЛЕНИЕ *****

Я придумал что-то, что, кажется, работает, но это кажется более сложным, чем мне бы хотелось. В дополнение к альтернативным решениям первоначально предложенной проблемы, я был бы заинтересован в обратной связи по следующим вопросам:

class ApiService {
    private asyncDelay = 25;
    private wait$ = new BehaviorSubject<Observable<Date>>(new BehaviorSubject<Date>(new Date()));
    private read$ = new Subject<Observable<Response>>();
    private buffer$ = new ReplaySubject<Observable<Response>>(1);

    constructor() {    
        const flush$ = new Subject<any>();
        this.read$.bufferToggle(this.wait$, () => flush$)
            .do(x => console.warn('FLUSH: ' + x.length))
            .switchMap(items => Observable.from(items))
            .subscribe(this.buffer$);

        this.wait$.switchMap(resume$ => {
            console.warn('WAIT')
            return resume$.switchMap(() => {
                console.warn('RESUMED')
                flush$.next(new Date());
                return Observable.merge(this.buffer$, this.read$)
                    .mergeAll();
            });
        })
        .subscribe();
    }

    protected get(url: string, options?: RequestOptionsArgs): Observable<Response> {         
        const abort$ = this.wait$
            .skip(1)
            .take(1)
            .publishReplay(1)
            .do(() => console.warn('ABORT'));        

        const source$ = this.http.get(url, options) 
            .takeUntil(abort$)           
            .publishReplay(1);

        const retry$ = abort$.switchMap(() => Observable.timer(this.asyncDelay)
            .switchMap(() => this.get(url, options)))
            .takeUntil(source$)
            .publishReplay(1);

        this.read$.next(new Observable((observer: Observer<Response>) => {
            source$.catch(err => Observable.of(err))
              .subscribe(observer);

            source$.connect();
        }));        

        return Observable.race(source$, retry$)
            .take(1);
  }

  protected put(url: string, body: any, options?: RequestOptionsArgs): Observable<Response> {
        const resume$ = new ReplaySubject<Date>(1);
        this.wait$.next(resume$.delay(this.asyncDelay).take(1));
        return this.http.put(url, body, options)
            .finally(() => resume$.next(new Date()));
  }

}

Идея заключается в том, что операции чтения [get()] возвращаются в опубликованном / отключенном состоянии до тех пор, пока не будет подтверждено, что они имеют право на выполнение (нет ожидающих операций записи).

Подписки в конструкторе выполняются на протяжении всего срока службы приложения.

Любая операция записи [put()] приводит к эмиссии потоком ожидания $. Эмиссия является потоком возобновления $, который испускается ровно один раз, когда завершается операция записи, инициировавшая эмиссию ожидания $.

Каждый раз, когда генерируется поток ожидания $, самая внутренняя подписка, ответственная за подключение операций чтения $, останавливается - дополнительно открывается подписка bufferToggled read $.

switchMap из потока ожидания $ в его возобновление $ эмиссии гарантирует, что самая внутренняя подписка, ответственная за подключение операций чтения $, не будет перезапущена, пока не завершится операция записи, которая остановила предыдущую подписку.

если последующая операция записи происходит до завершения предыдущей, поток ожидания $ просто перезапускается, в конечном итоге ожидая новый сигнал возобновления $. Когда я набираю это, я понимаю, что делается одно предположение, что операции записи будут вести себя несколько LILO-образом, когда дело доходит до времени завершения. Я думаю, что это нормально в моем конкретном случае использования.

После того, как будет выпущен последний отправленный возобновленный $ stream, буфер $ очищается, закрывается и объединяется с потоком новых операций чтения $ в реальном времени.

Поток $ source, возвращаемый вызовом get() подключен к завершению в случае начала операции записи во время ее выполнения. Кроме того, я создаю поток повторных попыток, который в конечном итоге начинается с выброса из потока ожидания. Я участвую в гонке () $ source и retry$ stream... в этом случае они взаимоисключающие - только один из них может излучать. Если источник $ завершен, тот же сигнал, который отвечает за его завершение, запускает последовательность $ retry, которая возвращается в get() и в конечном итоге приводит к буферизации ответа повторной попытки до завершения всех операций записи.

... и цикл повторяется.

Я не очень много знаю об основах RXJS или о накладных расходах, связанных с записью всех этих операторов и потоков вместе в качестве средства для достижения цели. В данный момент меня больше всего беспокоит утечка памяти и, очевидно, читаемость для бедной души, которой нужно изменить ее через 12 с лишним месяцев.

0 ответов

Другие вопросы по тегам