Читайте и скачивайте с нумерованных REST-сервисов с пружинной интеграцией

В настоящее время я работаю над приложением Spring Integration, которое имеет следующий сценарий:

  1. int-http:outbound-gateway читать из REST-Services список разбитых на страницы элементов: о в
  2. Каждое содержимое страницы разбивается и сохраняется в папке для последующей обработки весенним пакетным заданием.

Я довольно новичок в Spring-интеграции и не знаю, возможно ли создать цикл с помощью int-http:outbound-gateway для чтения всех страниц до последней.

Мы говорим о 66254 элементах, разделенных на 2651 страницу. То, что я ищу, - это лучшая практика для чтения и загрузки всех страниц и сбора данных без проблем с памятью.

Любое предложение будет оценено

Спасибо

2 ответа

Решение

Да, это возможно, хотя немного сложнее.

Предположим, что ваш REST сервис требует page как параметр запроса, так что вы хотели бы сделать запрос со страницы #1 и цикла (приращение page param) пока сервис не вернет пустой результат.

Итак, у вас может быть конфигурация для службы REST, например:

<int-http:outbound-gateway url="http://service/elements?page={page}">
    <int-http:uri-variable name="page" expression="headers['page']"/>
</int-http:outbound-gateway>

Обратите на это внимание <int-http:uri-variable> определение. С самого начала вы должны отправить сообщение этому <int-http:outbound-gateway> с page заголовок как 1,

Ответ от этого шлюза вы должны отправить на что-то вроде <recipient-list-router>, или же <publish-subscribe-channel>где один из абонентов по-прежнему ваш splitter чтобы хранить предметы в папке.

Другой подписчик немного умный. Начинается с <filter> проверить, если payload (результат вызова REST) ​​пуст, что означает, что мы сделали, и больше нет страниц в сервисе для извлечения. В противном случае вы используете <header-enricher> увеличить и заменить page заголовок и отправить результат в том, что наш первый <int-http:outbound-gateway>,

Я сделал нечто подобное, используя Java DSL для Spring Integration:

      @Bean
IntegrationFlow servicePointIntegrationFlow() {
    return IntegrationFlows.from("servicePointExportChannel")
            .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("pageNumber", new AtomicInteger()))
            .channel("singlePageRequestChannel")
            .get();
}
/**
 * Get a single page, and persist the contents of that page to the DB
 * Then, check if there are more pages.
 *   * If yes, request the next page by adding another message to the same channel (but with incremented pageNumber in the message header)
 *   * If no, publish a message to the next channel
 */
@Bean
IntegrationFlow paginatedDataFlow(ServicePointTransformer servicePointTransformer) {
    return IntegrationFlows.from("singlePageRequestChannel")
            .log(INFO, m -> "paginatedDataFlow called with pageNumber: " + m.getHeaders().get("pageNumber"))
            .handle(Http.outboundGateway(m -> myUrl + "&pageNumber={pageNumber}")
                    .uriVariable("pageNumber", m -> m.getHeaders().get("pageNumber", AtomicInteger.class).getAndIncrement())
                    .httpMethod(HttpMethod.GET)
                    .expectedResponseType(MyPage.class)
                    .mappedResponseHeaders(""))
            .routeToRecipients(route -> route
                    .recipientFlow(flow -> flow
                            .transform(MyPage::getServicePoints)
                            .transform(servicePointTransformer)
                            .handle(Jpa.updatingGateway(myEntityManagerFactory),
                                    spec -> spec.transactional(myTransactionManager))
                    .recipientFlow("payload.isNextPageAvailable", f -> f.channel("singlePageRequestChannel"))
                    .recipientFlow("!payload.isNextPageAvailable", f -> f.channel("someOtherChannel")))
            .get();
}

Полезная нагрузка / страница - это JSON и содержит поле верхнего уровня. nextPageAvailable который может быть доступен в десериализованной форме как MyPage.isNextPageAvailable().

Другие вопросы по тегам