Как узнать, когда выборка закончится, не блокируя основной поток?

Я делаю запросы к API, но их сервер разрешает только определенное количество активных подключений, поэтому я хотел бы ограничить количество текущих выборок. Для моих целей выборка завершается (а не продолжается) только тогда, когда тело ответа HTTP достигает клиента.

Я хотел бы создать такую ​​абстракцию:

const fetchLimiter = new FetchLimiter(maxConnections);
fetchLimiter.fetch(url, options); // Returns the same thing as fetch()

Это значительно упростило бы задачу, но, похоже, нет способа узнать, когда заканчивается поток, используемый другим кодом, потому что потоки блокируются во время чтения. Можно использоватьReadableStream.tee() чтобы разделить поток на два, используйте один и верните другой вызывающему (возможно, также создавая Response с ним), но это снизит производительность, верно?

1 ответ

Поскольку fetch использует обещания, вы можете воспользоваться этим, чтобы создать простую систему очередей.

Это метод, который я использовал раньше для постановки в очередь вещей, основанных на обещаниях. Он ставит элементы в очередь, создаваяPromiseа затем добавив его преобразователь в массив. Конечно, пока это Обещание не будет выполнено,await сохраняет любые последующие обещания от использования.

И все, что нам нужно сделать, чтобы начать следующую выборку по завершении одной, - это просто взять следующий преобразователь и вызвать его. Обещание разрешается, а затемfetch начинается!

Лучшая часть, поскольку мы фактически не потребляем fetch результат, не нужно беспокоиться о необходимости clone или что-нибудь... мы просто передаем его в целости и сохранности, чтобы вы могли употребить его позже then или что-то.

* Изменить: поскольку тело все еще передается после выполнения обещания выборки, я добавил третью опцию, чтобы вы могли передать тип тела, и чтобы FetchLimiter извлекал и анализировал тело за вас.

Все они возвращают обещание, которое в конечном итоге разрешается с фактическим содержанием.

Таким образом, вы можете просто заставить FetchLimiter проанализировать тело за вас. Я сделал так, чтобы он возвращал массив[response, data], таким образом вы по-прежнему можете проверять такие вещи, как код ответа, заголовки и т. д.

В этом отношении вы даже можете передать обратный вызов или что-то для использования в этом await если вам нужно сделать что-то более сложное, например, различные методы синтаксического анализа тела в зависимости от кода ответа.

пример

Я добавил комментарии, чтобы указать, где FetchLimiter код начинается и заканчивается... остальное - просто демонстрационный код.

Это подделка fetchиспользуя setTimeout, который разрешается в пределах 0,5–1,5 секунды. Он немедленно запустит первые три запроса, а затем будут заполнены активные запросы, и он будет ждать разрешения одного из них.

Когда это произойдет, вы увидите комментарий о том, что обещание было разрешено, затем начнется следующее обещание в очереди, а затем вы увидите then из в forразрешение цикла. Я добавил этоthen просто чтобы вы могли видеть порядок событий.

(function() {
  const fetch = (resource, init) => new Promise((resolve, reject) => {
    console.log('starting ' + resource);
    setTimeout(() => {
      console.log(' - resolving ' + resource);
      resolve(resource);
    }, 500 + 1000 * Math.random());
  });

  function FetchLimiter() {
    this.queue = [];
    this.active = 0;
    this.maxActive = 3;
    this.fetchFn = fetch;
  }
  FetchLimiter.prototype.fetch = async function(resource, init, respType) {
    // if at max active, enqueue the next request by adding a promise
    // ahead of it, and putting the resolver in the "queue" array.
    if (this.active >= this.maxActive) {
      await new Promise(resolve => {
        this.queue.push(resolve); // push, adds to end of array
      });
    }
    this.active++; // increment active once we're about to start the fetch
    const resp = await this.fetchFn(resource, init);
    let data;
    if (['arrayBuffer', 'blob', 'json', 'text', 'formData'].indexOf(respType) >= 0)
      data = await resp[respType]();
    this.active--; // decrement active once fetch is done
    this.checkQueue(); // time to start the next fetch from queue
    return [resp, data]; // return value from fetch
  };

  FetchLimiter.prototype.checkQueue = function() {
    if (this.active < this.maxActive && this.queue.length) {
      // shfit, pulls from start of array. This gives first in, first out.
      const next = this.queue.shift();
      next('resolved'); // resolve promise, value doesn't matter
    }
  }

  const limiter = new FetchLimiter();
  for (let i = 0; i < 9; i++) {
    limiter.fetch('/mypage/' + i)
      .then(x => console.log(' - .then ' + x));
  }
})();

Предостережения:

  • Я не на 100% уверен, что тело все еще передается, когда обещание разрешается... это, похоже, вас беспокоит. Однако, если это проблема, вы можете использовать один из методов миксина Body, напримерblob или text или json, который не разрешается, пока содержимое тела не будет полностью проанализировано (см. здесь)

  • Я намеренно сделал его очень коротким (например, 15 строк фактического кода) в качестве очень простого доказательства концепции. Вы бы хотели добавить некоторую обработку ошибок в производственный код, чтобы, еслиfetch отклоняет из-за ошибки подключения или чего-то еще, что вы все еще уменьшаете активный счетчик и запускаете следующий fetch.

  • Конечно, он также использует async/awaitсинтаксис, потому что его намного легче читать. Если вам нужна поддержка старых браузеров, вы захотите переписать с помощью Promises или перенести с помощью babel или аналогичного.

Другие вопросы по тегам