Spring Integration извлекает разбитые на страницы результаты из службы REST
Я работаю над интеграцией со службой REST, идея в том, что она опрашивается исходящим шлюзом marketingCategoryOutboundGateway
осуществляется HttpRequestExecutingMessageHandler
, Шлюз отправляет запрос в службу REST и передает свой ответ marketingCategory
канал. Сам шлюз запускается сообщением, созданным marketingCategoryPollerMessageSource
с использованием makeTriggeringMessage
заводской метод.
Проблема в том, что сервис возвращает постраничные результаты. Я что-то, что бы слушать на marketingCategory
канал, кроме уже имеющегося у меня активатора службы, проверьте ответ и отправьте новое сообщение с увеличенным номером страницы, созданным makeTriggeringMessage
к marketingCategoryPoller
канал, так что код будет вращаться в цикле, пока не получит все страницы из службы REST.
Позволяет ли Spring Integration создавать такие фильтры, которые получают одно сообщение на входном канале, проверяют его на соответствие условию и отправляют новое сообщение на выходной канал, если условие выполняется?
Код:
//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}
//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}
//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}
//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
//make a message for triggering marketingCategoryOutboundGateway
return MessageBuilder.withPayload("")
.setHeader("Host", "eclinic")
.setHeader("page", page)
.build();
}
//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
//make a request to the REST service and push the response to the marketingCategory channel
}
//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
return (msg) -> {
//process the categories returned by marketingCategoryOutboundGateway
};
}
1 ответ
Я нашел решение, основанное на этой публикации. Прочитайте и загрузите из разбитых на страницы REST-Services с весенней интеграцией:
Запустите исходящий шлюз, который общается со службой REST и передает ответ на канал, используя адаптер входящего канала с устройством опроса. Адаптер входящего канала - это источник сообщений, который изначально генерирует сообщение с заголовком, указывающим номер страницы, который нужно извлечь из REST API. Заголовок сообщения страницы используется исходящим шлюзом для генерации URL, указывающего нужную страницу
Канал, на который исходящий шлюз отправляет ответы службы REST, имеет 2 подписчика:
2.1. активатор службы, который делает что-то с извлеченными данными
2.2. фильтр, который проверяет, является ли это последней страницей, и если нет, он отправляет сообщение дальше в другой канал, используемый обогащателем заголовка
Получив сообщение, обогащение заголовка увеличивает свой заголовок страницы и продвигает сообщение дальше к каналу, который запускает исходящий шлюз, шлюз считывает увеличенный заголовок страницы и выбирает следующую страницу из службы REST.
Цикл продолжает вращаться до тех пор, пока служба REST не вернет последнюю страницу. Фильтр не позволяет этому сообщению проходить к заголовку, который прерывает цикл.
Полный код:
@Configuration
public class IntegrationConfiguration {
private final ApiGateConfig apiGateConfig;
IntegrationConfiguration(ApiGateConfig apiGateConfig) {
this.apiGateConfig = apiGateConfig;
}
@Bean("marketingCategory")
MessageChannel marketingCategory() {
return new PublishSubscribeChannel();
}
@Bean
MessageChannel marketingCategoryPoller() {
return new DirectChannel();
}
@Bean
MessageChannel marketingCategoryPollerNextPage() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
return () -> makeTriggeringMessage(0);
}
/**
* Build a gateway triggering message
*/
private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
.setHeader("Host", "eclinic")
.setHeader("page", page)
.build();
}
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";
//the type of the payload
ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
};
//page number comes from the message
SpelExpressionParser expressionParser = new SpelExpressionParser();
var uriVariables = new HashMap<String, Expression>();
uriVariables.put("page", expressionParser.parseExpression("headers.page"));
HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
handler.setHttpMethod(HttpMethod.GET);
handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
handler.setOutputChannel(channel);
handler.setUriVariableExpressions(uriVariables);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
return (msg) -> {
var page = (RestPageImpl<MarketingCategory>) msg.getPayload();
System.out.println("Page #" + page.getNumber());
page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));
};
}
@Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
return !page.isLast();
}
@Bean
@Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
HeaderEnricher incrementPage() {
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");
var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
valueProcessor.setOverwrite(true);
headersToAdd.put("page", valueProcessor);
return new HeaderEnricher(headersToAdd);
}
}