Spring Integration извлекает разбитые на страницы результаты из службы REST

Я работаю над интеграцией со службой REST, идея в том, что она опрашивается исходящим шлюзом marketingCategoryOutboundGateway осуществляется HttpRequestExecutingMessageHandler, Шлюз отправляет запрос в службу REST и передает свой ответ marketingCategory канал. Сам шлюз запускается сообщением, созданным marketingCategoryPollerMessageSource с использованием makeTriggeringMessage заводской метод.

Проблема в том, что сервис возвращает постраничные результаты. Я что-то, что бы слушать на marketingCategory канал, кроме уже имеющегося у меня активатора службы, проверьте ответ и отправьте новое сообщение с увеличенным номером страницы, созданным makeTriggeringMessage к marketingCategoryPoller канал, так что код будет вращаться в цикле, пока не получит все страницы из службы REST.

Позволяет ли Spring Integration создавать такие фильтры, которые получают одно сообщение на входном канале, проверяют его на соответствие условию и отправляют новое сообщение на выходной канал, если условие выполняется?

Код:

//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}

//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}

//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}

//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
    //make a message for triggering marketingCategoryOutboundGateway
    return MessageBuilder.withPayload("")
            .setHeader("Host", "eclinic")
            .setHeader("page", page)
            .build();
}

//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
    //make a request to the REST service and push the response to the marketingCategory channel
}

//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
    return (msg) -> {
        //process the categories returned by marketingCategoryOutboundGateway
    };
}

1 ответ

Решение

Я нашел решение, основанное на этой публикации. Прочитайте и загрузите из разбитых на страницы REST-Services с весенней интеграцией:

  1. Запустите исходящий шлюз, который общается со службой REST и передает ответ на канал, используя адаптер входящего канала с устройством опроса. Адаптер входящего канала - это источник сообщений, который изначально генерирует сообщение с заголовком, указывающим номер страницы, который нужно извлечь из REST API. Заголовок сообщения страницы используется исходящим шлюзом для генерации URL, указывающего нужную страницу

  2. Канал, на который исходящий шлюз отправляет ответы службы REST, имеет 2 подписчика:

    2.1. активатор службы, который делает что-то с извлеченными данными

    2.2. фильтр, который проверяет, является ли это последней страницей, и если нет, он отправляет сообщение дальше в другой канал, используемый обогащателем заголовка

  3. Получив сообщение, обогащение заголовка увеличивает свой заголовок страницы и продвигает сообщение дальше к каналу, который запускает исходящий шлюз, шлюз считывает увеличенный заголовок страницы и выбирает следующую страницу из службы REST.

  4. Цикл продолжает вращаться до тех пор, пока служба REST не вернет последнюю страницу. Фильтр не позволяет этому сообщению проходить к заголовку, который прерывает цикл.

Полный код:

@Configuration
public class IntegrationConfiguration {

    private final ApiGateConfig apiGateConfig;

    IntegrationConfiguration(ApiGateConfig apiGateConfig) {
        this.apiGateConfig = apiGateConfig;
    }

    @Bean("marketingCategory")
    MessageChannel marketingCategory() {
        return new PublishSubscribeChannel();
    }

    @Bean
    MessageChannel marketingCategoryPoller() {
        return new DirectChannel();
    }

    @Bean
    MessageChannel marketingCategoryPollerNextPage() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
    public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
        return () -> makeTriggeringMessage(0);
    }

    /**
     * Build a gateway triggering message
     */
    private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
        return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
                .setHeader("Host", "eclinic")
                .setHeader("page", page)
                .build();
    }

    @Bean
    @ServiceActivator(inputChannel = "marketingCategoryPoller")
    public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {

        String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";

        //the type of the payload
        ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
        };

        //page number comes from the message
        SpelExpressionParser expressionParser = new SpelExpressionParser();
        var uriVariables = new HashMap<String, Expression>();
        uriVariables.put("page", expressionParser.parseExpression("headers.page"));

        HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
        handler.setHttpMethod(HttpMethod.GET);
        handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
        handler.setOutputChannel(channel);
        handler.setUriVariableExpressions(uriVariables);

        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "marketingCategory")
    public MessageHandler marketingCategoryHandler() {
        return (msg) -> {
            var page = (RestPageImpl<MarketingCategory>) msg.getPayload();

            System.out.println("Page #" + page.getNumber());

            page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));

        };
    }

    @Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
    public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
        return !page.isLast();
    }

    @Bean
    @Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
    HeaderEnricher incrementPage() {
        Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
        Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");

        var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
        valueProcessor.setOverwrite(true);

        headersToAdd.put("page", valueProcessor);
        return new HeaderEnricher(headersToAdd);
    }
}
Другие вопросы по тегам