Читайте и скачивайте с нумерованных REST-сервисов с пружинной интеграцией
В настоящее время я работаю над приложением Spring Integration, которое имеет следующий сценарий:
int-http:outbound-gateway
читать из REST-Services список разбитых на страницы элементов: о в- Каждое содержимое страницы разбивается и сохраняется в папке для последующей обработки весенним пакетным заданием.
Я довольно новичок в Spring-интеграции и не знаю, возможно ли создать цикл с помощью int-http:outbound-gateway для чтения всех страниц до последней.
Мы говорим о 66254 элементах, разделенных на 2651 страницу. То, что я ищу, - это лучшая практика для чтения и загрузки всех страниц и сбора данных без проблем с памятью.
Любое предложение будет оценено
Спасибо
2 ответа
Да, это возможно, хотя немного сложнее.
Предположим, что ваш REST сервис требует page
как параметр запроса, так что вы хотели бы сделать запрос со страницы #1 и цикла (приращение page
param) пока сервис не вернет пустой результат.
Итак, у вас может быть конфигурация для службы REST, например:
<int-http:outbound-gateway url="http://service/elements?page={page}">
<int-http:uri-variable name="page" expression="headers['page']"/>
</int-http:outbound-gateway>
Обратите на это внимание <int-http:uri-variable>
определение. С самого начала вы должны отправить сообщение этому <int-http:outbound-gateway>
с page
заголовок как 1
,
Ответ от этого шлюза вы должны отправить на что-то вроде <recipient-list-router>
, или же <publish-subscribe-channel>
где один из абонентов по-прежнему ваш splitter
чтобы хранить предметы в папке.
Другой подписчик немного умный. Начинается с <filter>
проверить, если payload
(результат вызова REST) пуст, что означает, что мы сделали, и больше нет страниц в сервисе для извлечения. В противном случае вы используете <header-enricher>
увеличить и заменить page
заголовок и отправить результат в том, что наш первый <int-http:outbound-gateway>
,
Я сделал нечто подобное, используя Java DSL для Spring Integration:
@Bean
IntegrationFlow servicePointIntegrationFlow() {
return IntegrationFlows.from("servicePointExportChannel")
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("pageNumber", new AtomicInteger()))
.channel("singlePageRequestChannel")
.get();
}
/**
* Get a single page, and persist the contents of that page to the DB
* Then, check if there are more pages.
* * If yes, request the next page by adding another message to the same channel (but with incremented pageNumber in the message header)
* * If no, publish a message to the next channel
*/
@Bean
IntegrationFlow paginatedDataFlow(ServicePointTransformer servicePointTransformer) {
return IntegrationFlows.from("singlePageRequestChannel")
.log(INFO, m -> "paginatedDataFlow called with pageNumber: " + m.getHeaders().get("pageNumber"))
.handle(Http.outboundGateway(m -> myUrl + "&pageNumber={pageNumber}")
.uriVariable("pageNumber", m -> m.getHeaders().get("pageNumber", AtomicInteger.class).getAndIncrement())
.httpMethod(HttpMethod.GET)
.expectedResponseType(MyPage.class)
.mappedResponseHeaders(""))
.routeToRecipients(route -> route
.recipientFlow(flow -> flow
.transform(MyPage::getServicePoints)
.transform(servicePointTransformer)
.handle(Jpa.updatingGateway(myEntityManagerFactory),
spec -> spec.transactional(myTransactionManager))
.recipientFlow("payload.isNextPageAvailable", f -> f.channel("singlePageRequestChannel"))
.recipientFlow("!payload.isNextPageAvailable", f -> f.channel("someOtherChannel")))
.get();
}
Полезная нагрузка / страница - это JSON и содержит поле верхнего уровня.
nextPageAvailable
который может быть доступен в десериализованной форме как
MyPage.isNextPageAvailable()
.