Создание произвольного числа последовательных, зависимых запросов с помощью `cycle-http`

Есть ли примеры создания произвольного числа последовательных, зависимых HTTP-запросов с cycle-http?

Я хочу получить каждую страницу из API, где следующий запрос может быть сделан только с использованием данных на текущей странице.

Я пытался адаптировать этот ответ, который использует Observable.merge(), но я не уверен, как подключить его к cycle-http источники и раковины.

Рекомендации

3 ответа

Решение

Вот еще один пример произвольного числа последовательных зависимых запросов с использованием Cycle.js и @cycle/fetch Водитель.

(Использование API пользователей GitHub. Запрос пользователей возвращает 30 пользователей на страницу, а since Параметр URL является номером идентификатора пользователя и запускает запрос со следующего идентификатора пользователя.)

Сначала начальная часть main функция с комментариями:

const listResponse$ = sources.FETCH // response returned from FETCH driver
  .mergeAll()
  .flatMap(res => res.json())
  .scan(
    ((userstotals, users) =>
      [
        userstotals[0] + 1, // page count
        users[29] && users[29].id, // last id on full page
        userstotals[2].concat(users) // collect all users
      ]
    ),
    [0, undefined, []] // default accumulator
  )
  .share(); // allows stream split


// Branch #1 - cycle again for more pages
const listRequest$ = listResponse$
  .filter(users =>
    0 < users[1] && // full page id exists
    maxpages > users[0]; // less than maxpages
  )
  .startWith('initial')
  .map(users =>
    `https:\/\/api.github.com/users?since=${
      (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
      idstart // default id start
    }`
  );


// Branch #2 - display
const dom$ = listResponse$
  .map(userstotals => div(JSON.stringify(userstotals[2])));

(Это обновленный ответ. Я понял, scanс можно объединить в один.)

ОБЪЯСНЕНИЕ: Сначала вытащите ответ из sources имущество FETCH, расплющить его и вытащить JSON, затем scan посчитать, сколько страниц было запрошено до сих пор. Количество запрашиваемых страниц позже сравнивается с maxpages чтобы не превышать заранее установленное число. Далее получите последний id полной страницы, если существует, и последний, concat текущая страница пользователей с коллекцией страниц пользователей, накопленных до сих пор. После накопления ответной информации share поток, так что он может быть разделен на две ветви.

Первая ветвь используется для повторного цикла запроса обратно через FETCH драйвер для запроса большего количества страниц. Но сначала filter проверить последнюю страницу и количество запрашиваемых страниц. Если идентификатор не число, то последняя страница была достигнута. Не продолжайте, если последняя страница уже достигнута и, следовательно, больше нет страниц для запроса. Также не продолжайте, если количество запрашиваемых страниц превышает значение maxpages,

Вторая ветвь просто достигает накопленного ответа, чтобы получить полный список пользователей, затем JSON.stringifys объект и преобразует его в виртуальный объект dom (div метод) для отправки в драйвер DOM для отображения.


И вот полный сценарий:

import Cycle from '@cycle/rx-run';
import {div, makeDOMDriver} from '@cycle/dom';
import {makeFetchDriver} from '@cycle/fetch';

function main(sources) { // provides properties DOM and FETCH (evt. streams)

  const acctok = ''; // put your token here, if necessary
  const idstart = 19473200; // where do you want to start?
  const maxpages = 10;

  const listResponse$ = sources.FETCH
    .mergeAll()
    .flatMap(res => res.json())
    .scan(
      ((userstotals, users) =>
        [
          userstotals[0] + 1, // page count
          users[29] && users[29].id, // last id on full page
          userstotals[2].concat(users) // collect all users
        ]
      ),
      [0, undefined, []]
    )
    .share();

  const listRequest$ = listResponse$
    .filter(function (users) {
      return 0 < users[1] && maxpages > users[0];
    })
    .startWith('initial')
    .map(users =>
      `https:\/\/api.github.com/users?since=${
        (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
        idstart // default id start
      }` //&access_token=${acctok}`
    );

  const dom$ = listResponse$
    .map(userstotals => div(JSON.stringify(userstotals[2])));

  return {
    DOM: dom$,
    FETCH: listRequest$
  };
}

Cycle.run(main, {
  DOM: makeDOMDriver('#main-container'),
  FETCH: makeFetchDriver()
});

(Мой первый ответ, оставленный для потомков. Обратите внимание на два scanс.)

const listResponse$ = sources.FETCH
  .mergeAll()
  .flatMap(res => res.json())
  .scan(((userscount, users) =>              // <-- scan #1
    [userscount[0] + 1, users]), [0, []]
  )
  .share();

const listRequest$ = listResponse$
  .filter(function (users) {
    return users[1][29] && users[1][29].id &&
      maxpages > users[0];
  })
  .startWith('initial')
  .map(users =>
    `https://api.github.com/users?since=${
      (users[1][users[1].length-1] && users[1][users[1].length-1].id) ||
        idstart
      }`
  );

const dom$ = listResponse$
  .scan(function (usersall, users) {          // <-- scan #2
    usersall.push(users);
    return usersall;
  }, [])
  .map(res => div(JSON.stringify(res)));

От scanСначала, мне нужно было получить последний идентификатор последней страницы, если он существует, и сохранить его в аккумуляторе.

Будет лучше, если вы приведете пример кода. однако основная логика может быть такой:

  1. Сопоставить поток ответов с потоком запросов
  2. Запустить поток запросов с начальным запросом

Код будет выглядеть так:

function main (sources){
  const initialRequest = {
    url: 'http://www.google.com'
  };
  
  const request$ = sources.HTTP
  .filter(response$ => /*FILTER LOGIC GOES HERE */)
  .switch()//or you can use flatMap
  .map(response =>/* MAP RESPONSE TO A NEW REQUEST */)
  .startWith(initialRequest);
  
  return {
    HTTP: request$
  };
}

Так что это, вероятно, ужасно усложняется, и я должен отказаться от него, чтобы должным образом попробовать ответ Эрдала, но вот что я придумал...

использование

export default function app({HTTP}) {
  const {
    allPagesRequest$: staffPagesReq$,
    latestData$: staff$,
  } = getAllPages({url: '/staff', HTTP});

  // staff$ is converted to vdom...

  return /* sinks */ {
    DOM:  staffPagesReq$,
    HTTP: staffVdom$,
  }
}

реализация

const fetchNthPage = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    }, optsIn
  );

  const u = new URI(opts.url)
    .setQuery({'_page': opts.page.toString()});

  const pageNResponse$ = opts.HTTP
    .filter(
      res$ => res$.request.url === urlForEndpoint(u)
    )
    .flatMap(
      res$ => res$.catch(
        err => Observable.of(
          {
            body: {'error in response:': err.toString()}
          }
        )
      )
    )
    .map(res => res.body)
    .take(1)
    .shareReplay(1);

  return Observable.of({
    pageNRequest$:  Observable.of(basicRequestObject(u)),
    pageNResponse$: pageNResponse$,
    opts:           opts
  });
};


const encapsulateAs = typeName => data => {
  return {type: typeName, data}
};


const fetchAllPagesIndividually = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    },
    optsIn
  );

  return Observable.defer(
    () => fetchNthPage(opts)
      .flatMap(x => {
        const mergedItems$ = Observable
          .merge(
            x.pageNRequest$.map(encapsulateAs('request')),
            x.pageNResponse$.map(encapsulateAs('response'))
          );


        const optsForNextPage = merge(opts, {
          page: opts.page + 1
        });

        const next$ = Observable
          .never() // `next$` shouldn't end when `pageNResponse$` does
          .merge(x.pageNResponse$)
          .shareReplay(1)
          .takeWhile(res => {
            //noinspection UnnecessaryLocalVariableJS
            let isFullPage = path(['response', 'length'], res) === apiPageSize;
            return isFullPage;
          })
          .flatMap(() => {
            return fetchAllPagesIndividually(optsForNextPage)
          });

        //noinspection UnnecessaryLocalVariableJS
        const concattedItem$ = Observable
          .concat(
            mergedItems$,
            next$
          );

        return concattedItem$
      })
      .shareReplay(1)
  );
};


const concatPages = (acc, currentVal, index, source) => acc.concat(currentVal);

const typeIs = typeStr => compose(
  equals(typeStr),
  prop('type')
);

const typeNotIn = typesArray => compose(
  not,
  unary(flip(contains)(typesArray)),
  prop('type')
);

const getAllPages = (optsIn) => {
  const f$ = fetchAllPagesIndividually(optsIn)
    .shareReplay(1);

  const allPagesRequest$ = f$
    .filter(typeIs('request'))
    .map(prop('data'));

  const allPagesResponse$ = f$
    .filter(typeIs('response'))
    .map(prop('data'));

  const theRest$ = f$
    .filter(typeNotIn(['request', 'response', 'data']));

  const latestData$ = allPagesResponse$
    .map(prop('response'))
    .scan(concatPages);

  return {
    allPagesRequest$,
    allPagesResponse$,
    latestData$,
    theRest$,
  }
};

compose(), not(), merge(), unary()и т. д. из Рамда.

Другие вопросы по тегам