Как правильно передать результат сетевого вызова Angular Http в RxJs 5?

Используя Http, мы вызываем метод, который выполняет сетевой вызов и возвращает наблюдаемый http:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

Если мы возьмем это наблюдаемое и добавим к нему несколько подписчиков:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

Что мы хотим сделать, так это убедиться, что это не вызывает множественных сетевых запросов.

Это может показаться необычным сценарием, но на самом деле он довольно распространен: например, если вызывающий абонент подписывается на наблюдаемое для отображения сообщения об ошибке и передает его в шаблон с помощью асинхронного канала, у нас уже есть два подписчика.

Как правильно сделать это в RxJs 5?

А именно, это, кажется, работает нормально:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

Но это идиоматический способ сделать это в RxJs 5, или мы должны вместо этого сделать что-то еще?

Примечание: согласно Angular 5 new HttpClient , .map(res => res.json()) часть во всех примерах теперь бесполезна, так как результат JSON теперь принят по умолчанию.

23 ответа

Кэшируйте данные и, если доступно, кэшируйте, возвращайте это, иначе сделайте HTTP-запрос.

import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';

@Injectable()
export class DataService {
  private url:string = 'https://cors-test.appspot.com/test';

  private data: Data;
  private observable: Observable<any>;

  constructor(private http:Http) {}

  getData() {
    if(this.data) {
      // if `data` is available just return it as `Observable`
      return Observable.of(this.data); 
    } else if(this.observable) {
      // if `this.observable` is set then the request is in progress
      // return the `Observable` for the ongoing request
      return this.observable;
    } else {
      // example header (not necessary)
      let headers = new Headers();
      headers.append('Content-Type', 'application/json');
      // create the request, store the `Observable` for subsequent subscribers
      this.observable = this.http.get(this.url, {
        headers: headers
      })
      .map(response =>  {
        // when the cached data is available we don't need the `Observable` reference anymore
        this.observable = null;

        if(response.status == 400) {
          return "FAILURE";
        } else if(response.status == 200) {
          this.data = new Data(response.json());
          return this.data;
        }
        // make it shared so more than one subscriber can get the result
      })
      .share();
      return this.observable;
    }
  }
}

Пример плунжера

Этот артиллерийский https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html отличный пример того, как кешировать shareReplay,

Согласно предложению @Cristian, это один из способов, который хорошо работает для наблюдаемых HTTP-данных, которые излучают только один раз, а затем завершают:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}

ОБНОВЛЕНИЕ: Бен Леш говорит, что в следующем небольшом выпуске после 5.2.0 вы сможете просто вызвать функцию shareReplay() для кеширования.

РАНЕЕ.....

Во-первых, не используйте share() или publishReplay(1).refCount(), они одинаковы, и проблема с ним в том, что он разделяет только если соединения установлены, когда наблюдаемая активна, если вы подключаетесь после ее завершения, он снова создает новый наблюдаемый, перевод, а не кеширование.

Birowski дал правильное решение выше, которое заключается в использовании ReplaySubject. ReplaySubject будет кэшировать значения, которые вы ему задаете (bufferSize) в нашем случае 1. Он не создаст новую наблюдаемую информацию, например share(), когда refCount достигнет нуля, и вы создадите новое соединение, что является правильным поведением для кэширования.

Вот многоразовая функция

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

Вот как это использовать

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

Ниже приведена более продвинутая версия кешируемой функции. Эта позволяет иметь собственную таблицу поиска + возможность предоставления пользовательской таблицы поиска. Таким образом, вам не нужно проверять this._cache, как в примере выше. Также обратите внимание, что вместо передачи наблюдаемого в качестве первого аргумента, вы передаете функцию, которая возвращает наблюдаемые, это потому, что Http Angular выполняет сразу, поэтому, возвращая лениво выполненную функцию, мы можем решить не вызывать ее, если она уже находится в наш кеш.

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

Использование:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")

В rxjs 5.4.0 появился новый метод shareReplay.

Автор прямо говорит, что "идеально подходит для обработки таких вещей, как кэширование результатов AJAX"

rxjs PR # 2443 feat (shareReplay): добавляет shareReplay вариант publishReplay

shareReplay возвращает наблюдаемое, являющееся источником многоадресной передачи через ReplaySubject. Тема воспроизведения повторяется по ошибке источника, но не по завершении источника. Это делает shareReplay идеальным для обработки таких вещей, как кэширование результатов AJAX, так как оно повторяется. Это повторное поведение, однако, отличается от общего тем, что оно не будет повторять наблюдаемый источник, а будет повторять значения наблюдаемой источника.

В rxjs версии 5.4.0 (2017-05-09) добавлена ​​поддержка shareReplay.

Зачем использовать shareReplay?

Как правило, вы хотите использовать shareReplay, когда у вас есть побочные эффекты или налоги вычислений, которые вы не хотите выполнять среди нескольких подписчиков. Это также может быть полезно в ситуациях, когда вы знаете, что у вас будут поздние подписчики на поток, которому необходим доступ к ранее выданным значениям. Эта способность воспроизводить значения в подписке - это то, что отличает share и shareReplay.

Вы можете легко изменить угловой сервис, чтобы использовать его и возвращать наблюдаемое с кэшированным результатом, который только когда-либо сделает вызов http один раз (при условии, что первый вызов был успешным).

Пример угловой службы

Вот очень простое обслуживание клиентов, которое использует shareReplay,

customer.service.ts

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class CustomerService {

    private readonly _getCustomers: Observable<ICustomer[]>;

    constructor(private readonly http: HttpClient) {
        this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
    }

    getCustomers() : Observable<ICustomer[]> {
        return this._getCustomers;
    }
}

export interface ICustomer {
  /* ICustomer interface fields defined here */
}

Обратите внимание, что присвоение в конструкторе может быть перемещено в метод getCustomers но как наблюдаемые вернулись из HttpClient "холодно", делать это в конструкторе приемлемо, так как вызов http будет выполняться только каждый раз при первом вызове subscribe ,

Также предполагается, что исходные возвращенные данные не устаревают в течение времени жизни экземпляра приложения.

В соответствии с этой статьей

Оказывается, мы можем легко добавить кеширование в наблюдаемое, добавив publishReplay(1) и refCount.

так внутри, если заявления просто добавить

.publishReplay(1)
.refCount();

в .map(...)

Я пометил вопрос, но постараюсь попробовать.

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

Вот доказательство:)

Есть только один вынос: getCustomer().subscribe(customer$)

Мы не подписываемся на ответ API getCustomer()мы подписываемся на ReplaySubject, который является наблюдаемым, который также может подписаться на другое Observable и (и это важно) удерживать его последнее переданное значение и повторно опубликовать его любому из его (ReplaySubject's) подписчиков.

Я нашел способ сохранить результат http get в sessionStorage и использовать его для сеанса, чтобы он никогда больше не вызывал сервер.

Я использовал его для вызова API github, чтобы избежать ограничения использования.

@Injectable()
export class HttpCache {
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    let cached: any;
    if (cached === sessionStorage.getItem(url)) {
      return Observable.of(JSON.parse(cached));
    } else {
      return this.http.get(url)
        .map(resp => {
          sessionStorage.setItem(url, resp.text());
          return resp.json();
        });
    }
  }
}

К вашему сведению, предел сессии - 5 млн. (Или 4,75 млн.). Таким образом, он не должен использоваться таким образом для большого набора данных.

------ редактировать -------------
Если вы хотите обновить данные с помощью F5, которая использует данные памяти вместо sessionStorage;

@Injectable()
export class HttpCache {
  cached: any = {};  // this will store data
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    if (this.cached[url]) {
      return Observable.of(this.cached[url]));
    } else {
      return this.http.get(url)
        .map(resp => {
          this.cached[url] = resp.text();
          return resp.json();
        });
    }
  }
}

Реализация, которую вы выберете, будет зависеть от того, хотите ли вы отменить подписку (), чтобы отменить ваш HTTP-запрос или нет.

В любом случае, декораторы TypeScript - хороший способ стандартизировать поведение. Это тот, который я написал:

  @CacheObservableArgsKey
  getMyThing(id: string): Observable<any> {
    return this.http.get('things/'+id);
  }

Определение декоратора:

/**
 * Decorator that replays and connects to the Observable returned from the function.
 * Caches the result using all arguments to form a key.
 * @param target
 * @param name
 * @param descriptor
 * @returns {PropertyDescriptor}
 */
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
  const originalFunc = descriptor.value;
  const cacheMap = new Map<string, any>();
  descriptor.value = function(this: any, ...args: any[]): any {
    const key = args.join('::');

    let returnValue = cacheMap.get(key);
    if (returnValue !== undefined) {
      console.log(`${name} cache-hit ${key}`, returnValue);
      return returnValue;
    }

    returnValue = originalFunc.apply(this, args);
    console.log(`${name} cache-miss ${key} new`, returnValue);
    if (returnValue instanceof Observable) {
      returnValue = returnValue.publishReplay(1);
      returnValue.connect();
    }
    else {
      console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
    }
    cacheMap.set(key, returnValue);
    return returnValue;
  };

  return descriptor;
}

Кэшируемые данные ответа HTTP с использованием Rxjs Observer/Observable + Caching + Subscription

Смотрите код ниже

* Отказ от ответственности: я новичок в rxjs, поэтому имейте в виду, что я могу неправильно использовать подход наблюдаемого / наблюдателя. Мое решение - это просто совокупность других решений, которые я нашел, и является следствием того, что не удалось найти простое, хорошо документированное решение. Таким образом, я предоставляю свое полное решение для кода (как я хотел бы найти) в надежде, что оно поможет другим.

* обратите внимание, что этот подход в значительной степени основан на GoogleFirebaseObservables. К сожалению, мне не хватает надлежащего опыта / времени, чтобы повторить то, что они сделали под капотом. Но ниже приведен упрощенный способ предоставления асинхронного доступа к некоторым данным в кэше.

Ситуация: компоненту "список товаров" поручено отображение списка товаров. Сайт представляет собой одностраничное веб-приложение с некоторыми кнопками меню, которые будут "фильтровать" продукты, отображаемые на странице.

Решение: компонент "подписывается" на сервисный метод. Метод service возвращает массив объектов продукта, к которым компонент обращается через обратный вызов подписки. Сервисный метод оборачивает свою активность во вновь созданном Обозревателе и возвращает наблюдателя. Внутри этого наблюдателя он ищет кэшированные данные, передает их обратно подписчику (компоненту) и возвращает. В противном случае он отправляет вызов http для извлечения данных, подписывается на ответ, где вы можете обработать эти данные (например, сопоставить данные с вашей собственной моделью), а затем передать данные обратно подписчику.

Код

продакт-list.component.ts

import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';

@Component({
  selector: 'app-product-list',
  templateUrl: './product-list.component.html',
  styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
  products: Product[];

  constructor(
    private productService: ProductService
  ) { }

  ngOnInit() {
    console.log('product-list init...');
    this.productService.getProducts().subscribe(products => {
      console.log('product-list received updated products');
      this.products = products;
    });
  }
}

product.service.ts

import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';

@Injectable()
export class ProductService {
  products: Product[];

  constructor(
    private http:Http
  ) {
    console.log('product service init.  calling http to get products...');

  }

  getProducts():Observable<Product[]>{
    //wrap getProducts around an Observable to make it async.
    let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
      //return products if it was previously fetched
      if(this.products){
        console.log('## returning existing products');
        observer.next(this.products);
        return observer.complete();

      }
      //Fetch products from REST API
      console.log('** products do not yet exist; fetching from rest api...');
      let headers = new Headers();
      this.http.get('http://localhost:3000/products/',  {headers: headers})
      .map(res => res.json()).subscribe((response:ProductResponse) => {
        console.log('productResponse: ', response);
        let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
        this.products = productlist;
        observer.next(productlist);
      });
    }); 
    return productsObservable$;
  }
}

product.ts (модель)

export interface ProductResponse {
  success: boolean;
  msg: string;
  products: Product[];
}

export class Product {
  product_id: number;
  sku: string;
  product_title: string;
  ..etc...

  constructor(product_id: number,
    sku: string,
    product_title: string,
    ...etc...
  ){
    //typescript will not autoassign the formal parameters to related properties for exported classes.
    this.product_id = product_id;
    this.sku = sku;
    this.product_title = product_title;
    ...etc...
  }



  //Class method to convert products within http response to pure array of Product objects.
  //Caller: product.service:getProducts()
  static fromJsonList(products:any): Product[] {
    let mappedArray = products.map(Product.fromJson);
    return mappedArray;
  }

  //add more parameters depending on your database entries and constructor
  static fromJson({ 
      product_id,
      sku,
      product_title,
      ...etc...
  }): Product {
    return new Product(
      product_id,
      sku,
      product_title,
      ...etc...
    );
  }
}

Вот пример вывода, который я вижу, когда загружаю страницу в Chrome. Обратите внимание, что при первоначальной загрузке продукты выбираются из http (позвоните в службу отдыха моего узла, которая работает локально на порте 3000). Когда я нажимаю, чтобы перейти к "фильтрованному" представлению товаров, товары находятся в кеше.

Мой журнал Chrome (консоль):

core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init.  calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse:  {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products

... [нажал кнопку меню, чтобы отфильтровать продукты]...

app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products

Вывод: это самый простой способ, который я нашел (на данный момент) для реализации кэшируемых данных ответов http. В моем угловом приложении каждый раз, когда я перехожу к другому виду продуктов, компонент списка продуктов перезагружается. ProductService, по-видимому, является общим экземпляром, поэтому локальный кэш "products: Product[]" в ProductService сохраняется во время навигации, и последующие вызовы "GetProducts()" возвращают кэшированное значение. В заключение я прочитал комментарии о том, как наблюдаемые / подписки должны быть закрыты, когда вы закончите, чтобы предотвратить "утечки памяти". Я не включил это здесь, но об этом нужно помнить.

Отличные ответы.

Или вы могли бы сделать это:

Это из последней версии rxjs. Я использую 5.5.7 версию RxJS

import {share} from "rxjs/operators";

this.http.get('/someUrl').pipe(share());

Я предполагаю, что http://github.com/ngx-cache/core может быть полезен для поддержки функций кэширования для http-вызовов, особенно если HTTP-вызов выполняется на платформах браузера и сервера.

Допустим, у нас есть следующий метод:

getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

Вы можете использовать Cached декоратор http://github.com/ngx-cache/core для хранения возвращенного значения из метода, выполняющего HTTP-вызов на cache storage (storage можно настроить, пожалуйста, проверьте реализацию в https://github.com/ng-seed/universal) - прямо при первом выполнении. В следующий раз, когда метод вызывается (независимо от браузера или серверной платформы), значение извлекается из cache storage,

import { Cached } from '@ngx-cache/core';

...

@Cached('get-customer') // the cache key/identifier
getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

Также есть возможность использовать методы кеширования (has, get, set) используя API кеширования.

anyclass.ts

...
import { CacheService } from '@ngx-cache/core';

@Injectable()
export class AnyClass {
  constructor(private readonly cache: CacheService) {
    // note that CacheService is injected into a private property of AnyClass
  }

  // will retrieve 'some string value'
  getSomeStringValue(): string {
    if (this.cache.has('some-string'))
      return this.cache.get('some-string');

    this.cache.set('some-string', 'some string value');
    return 'some string value';
  }
}

Вот список пакетов, как для кэширования на стороне клиента, так и на стороне сервера:

Что мы хотим сделать, так это убедиться, что это не вызывает множественных сетевых запросов.

Мой личный фаворит - использовать async методы для вызовов, которые делают сетевые запросы. Сами методы не возвращают значение, а обновляют BehaviorSubject в том же сервисе, на которые будут подписываться компоненты.

Теперь зачем использовать BehaviorSubject вместо Observable? Так как,

  • При подписке BehaviorSubject возвращает последнее значение, тогда как обычная наблюдаемая срабатывает только при получении onnext,
  • Если вы хотите получить последнее значение BehaviorSubject в ненаблюдаемом коде (без подписки), вы можете использовать getValue() метод.

Пример:

customer.service.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

Тогда, где требуется, мы можем просто подписаться на customers$,

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

Или, может быть, вы хотите использовать его непосредственно в шаблоне

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

Так что теперь, пока вы не сделаете еще один звонок getCustomersданные сохраняются в customers$ BehaviorSubject.

Так что, если вы хотите обновить эти данные? просто позвоните getCustomers()

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

Используя этот метод, нам не нужно явно сохранять данные между последующими сетевыми вызовами, так как они обрабатываются BehaviorSubject,

PS: Обычно, когда компонент уничтожается, хорошей практикой является избавление от подписок, для этого вы можете использовать метод, предложенный в этом ответе.

Вы можете создать простой класс Cacheable<>, который помогает управлять данными, полученными с http-сервера несколькими подписчиками:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

использование

Объявите объект Cacheable<> (предположительно как часть службы):

list: Cacheable<string> = new Cacheable<string>();

и обработчик:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

Звонок из компонента:

//gets data from server
List.getData().subscribe(…)

Вы можете подписаться на несколько компонентов.

Более подробная информация и пример кода находятся здесь: http://devinstance.net/articles/20171021/rxjs-cacheable

Большинство приведенных выше ответов подходят для http-запросов, которые не принимают входные данные. Каждый раз, когда вы хотите сделать вызов API, используя какой-либо ввод, запрос необходимо создавать заново. Единственный ответ выше, который может справиться с этим, - это ответ @Arlo .

Я создал немного более простой декоратор, который вы можете использовать, чтобы поделиться ответом с каждым вызывающим абонентом, который имеет одинаковый ввод. В отличие от ответа Арло, он не воспроизводит ответы задержанным подписчикам, а обрабатывает одновременные запросы как один. Если цель состоит в том, чтобы воспроизвести ответы задержанным наблюдателям (также известные как кэшированные ответы), вы можете изменить приведенный ниже код и заменить share()с shareReplay(1):

https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a

      import { finalize, Observable, share } from 'rxjs';

export function SharedObservable(): MethodDecorator {
  const obs$ = new Map<string, Observable<any>>();
  return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
    const originalMethod = descriptor.value;
    descriptor.value = function (...args: any[]) {
      const key = JSON.stringify(args);
      if (!obs$.has(key)) {
        // We have no observable for this key yet, so we create one
        const res = originalMethod.apply(this, args).pipe(
          share(), // Make the observable hot
          finalize(() => obs$.delete(key)) // Cleanup when observable is complete
        );
        obs$.set(key, res);
      }
      // Return the cached observable
      return obs$.get(key);
    };
    return descriptor;
  };
}

ПРИМЕНЕНИЕ:

      @SharedObservable()
myFunc(id: number): Observable<any> {
  return this.http.get<any>(`/api/someUrl/${id}`);
}

Это .publishReplay(1).refCount(); или же .publishLast().refCount(); Так как Angular Http наблюдаемые завершены после запроса.

Этот простой класс кэширует результат, так что вы можете подписаться на.value много раз и делает только 1 запрос. Вы также можете использовать.reload(), чтобы сделать новый запрос и опубликовать данные.

Вы можете использовать его как:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

и источник:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}

rxjs 5.3.0

Я не был доволен .map(myFunction).publishReplay(1).refCount()

С несколькими подписчиками, .map() исполняет myFunction дважды в некоторых случаях (я ожидаю, что он будет выполняться только один раз). Кажется, одно исправление publishReplay(1).refCount().take(1)

Еще одна вещь, которую вы можете сделать, это просто не использовать refCount() и сразу же сделайте Observable горячим:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

Это запустит HTTP-запрос независимо от подписчиков. Я не уверен, отменит ли отмена подписки до завершения HTTP GET или нет.

Вы можете просто использовать ngx-cacheable! Это лучше подходит для вашего сценария.

Преимущество использования этого

  • Он вызывает rest API только один раз, кэширует ответ и возвращает то же самое для следующих запросов.
  • Может вызывать API по мере необходимости после создания / обновления / удаления.

Итак, ваш класс обслуживания будет что-то вроде этого -

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

Вот ссылка для получения дополнительной ссылки.

Это именно то, для чего я создал библиотеку ngx-rxcache.

Посмотрите на это по адресу https://github.com/adriandavidbrand/ngx-rxcache и посмотрите рабочий пример на https://stackblitz.com/edit/angular-jxqaiv

Я написал кеш класс,

/**
 * Caches results returned from given fetcher callback for given key,
 * up to maxItems results, deletes the oldest results when full (FIFO).
 */
export class StaticCache
{
    static cachedData: Map<string, any> = new Map<string, any>();
    static maxItems: number = 400;

    static get(key: string){
        return this.cachedData.get(key);
    }

    static getOrFetch(key: string, fetcher: (string) => any): any {
        let value = this.cachedData.get(key);

        if (value != null){
            console.log("Cache HIT! (fetcher)");
            return value;
        }

        console.log("Cache MISS... (fetcher)");
        value = fetcher(key);
        this.add(key, value);
        return value;
    }

    static add(key, value){
        this.cachedData.set(key, value);
        this.deleteOverflowing();
    }

    static deleteOverflowing(): void {
        if (this.cachedData.size > this.maxItems) {
            this.deleteOldest(this.cachedData.size - this.maxItems);
        }
    }

    /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
    /// However that seems not to work. Trying with forEach.
    static deleteOldest(howMany: number): void {
        //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
        let iterKeys = this.cachedData.keys();
        let item: IteratorResult<string>;
        while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
            //console.debug("    Deleting: " + item.value);
            this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
        }
    }

    static clear(): void {
        this.cachedData = new Map<string, any>();
    }

}

Это все статично из-за того, как мы его используем, но не стесняйтесь сделать его обычным классом и сервисом. Я не уверен, что Angular хранит один экземпляр за все время (новичок в Angular2).

И вот как я это использую:

            let httpService: Http = this.http;
            function fetcher(url: string): Observable<any> {
                console.log("    Fetching URL: " + url);
                return httpService.get(url).map((response: Response) => {
                    if (!response) return null;
                    if (typeof response.json() !== "array")
                        throw new Error("Graph REST should return an array of vertices.");
                    let items: any[] = graphService.fromJSONarray(response.json(), httpService);
                    return array ? items : items[0];
                });
            }

            // If data is a link, return a result of a service call.
            if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
            {
                // Make an HTTP call.
                let url = this.data[verticesLabel][name]["link"];
                let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
                if (!cachedObservable)
                    throw new Error("Failed loading link: " + url);
                return cachedObservable;
            }

Я предполагаю, что может быть более умный способ, который будет использовать некоторые Observable трюки, но это было хорошо для моих целей.

Просто позвоните share() после карты и перед любой подпиской.

В моем случае у меня есть универсальный сервис (RestClientService.ts), который выполняет вызов rest, извлекает данные, проверяет на наличие ошибок и возвращает наблюдаемый в конкретную службу реализации (например, ContractClientService.ts), наконец, эту конкретную реализацию. возвращает наблюдаемый в de ContractComponent.ts, и этот подписывается, чтобы обновить представление.

RestClientService.ts:

export abstract class RestClientService<T extends BaseModel> {

      public GetAll = (path: string, property: string): Observable<T[]> => {
        let fullPath = this.actionUrl + path;
        let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
        observable = observable.share();  //allows multiple subscribers without making again the http request
        observable.subscribe(
          (res) => {},
          error => this.handleError2(error, "GetAll", fullPath),
          () => {}
        );
        return observable;
      }

  private extractData(res: Response, property: string) {
    ...
  }
  private handleError2(error: any, method: string, path: string) {
    ...
  }

}

ContractService.ts:

export class ContractService extends RestClientService<Contract> {
  private GET_ALL_ITEMS_REST_URI_PATH = "search";
  private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
  public getAllItems(): Observable<Contract[]> {
    return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
  }

}

ContractComponent.ts:

export class ContractComponent implements OnInit {

  getAllItems() {
    this.rcService.getAllItems().subscribe((data) => {
      this.items = data;
   });
  }

}

Просто используйте этот слой кеша, он делает все, что вам нужно, и даже управляйте кешем для запросов ajax.

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

Это очень легко использовать

@Component({
    selector: 'home',
    templateUrl: './html/home.component.html',
    styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
    constructor(AjaxService:AjaxService){
        AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
    }

    articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}

Слой (как инъекционный угловой сервис)

import { Injectable }     from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable }     from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
    public data:Object={};
    /*
    private dataObservable:Observable<boolean>;
     */
    private dataObserver:Array<any>=[];
    private loading:Object={};
    private links:Object={};
    counter:number=-1;
    constructor (private http: Http) {
    }
    private loadPostCache(link:string){
     if(!this.loading[link]){
               this.loading[link]=true;
               this.links[link].forEach(a=>this.dataObserver[a].next(false));
               this.http.get(link)
                   .map(this.setValue)
                   .catch(this.handleError).subscribe(
                   values => {
                       this.data[link] = values;
                       delete this.loading[link];
                       this.links[link].forEach(a=>this.dataObserver[a].next(false));
                   },
                   error => {
                       delete this.loading[link];
                   }
               );
           }
    }

    private setValue(res: Response) {
        return res.json() || { };
    }

    private handleError (error: Response | any) {
        // In a real world app, we might use a remote logging infrastructure
        let errMsg: string;
        if (error instanceof Response) {
            const body = error.json() || '';
            const err = body.error || JSON.stringify(body);
            errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
        } else {
            errMsg = error.message ? error.message : error.toString();
        }
        console.error(errMsg);
        return Observable.throw(errMsg);
    }

    postCache(link:string): Observable<Object>{

         return Observable.create(observer=> {
             if(this.data.hasOwnProperty(link)){
                 observer.next(this.data[link]);
             }
             else{
                 let _observable=Observable.create(_observer=>{
                     this.counter=this.counter+1;
                     this.dataObserver[this.counter]=_observer;
                     this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
                     _observer.next(false);
                 });
                 this.loadPostCache(link);
                 _observable.subscribe(status=>{
                     if(status){
                         observer.next(this.data[link]);
                     }
                     }
                 );
             }
            });
        }
}

Вы пробовали запустить код, который у вас уже есть?

Потому что вы строите Observable из обещания, вытекающего из getJSON(), сетевой запрос делается до того, как кто-либо подпишется. И полученное обещание разделяют все подписчики.

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...
Другие вопросы по тегам