Весенняя интеграция DSL Scatter-Gather асинхронного / параллельного выполнения для нескольких receientFlow

Мы пытаемся делать параллельные звонки разным получателям, используя scatter-collect, и это работает нормально. Но второй поток получателей не запускается, если первый не завершен (прослеживается в Zipkin). есть способ сделать всех получателей асинхронными.. очень похожими на split-aggregate с каналом executor.

public IntegrationFlow flow1() {

        return flow -> flow
                .split().channel(c -> c.executor(Executors.newCachedThreadPool()))
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow2())
                                .recipientFlow(flow3())
                                .recipientFlow(flow4())
                                .recipientFlow(flow5()),
                        gatherer -> gatherer
                                .outputProcessor(messageGroup -> {
                                    Object request = gatherResponse(messageGroup);
                                    return createResponse(request);
                                }))
                .aggregate();
    }

Методы flow2(),flow3(),flow4() являются методами с InterationFlow как тип возврата.

образец кода flow2():

public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }

1 ответ

Решение

Это действительно возможно с указанным executor channel, Все потоки получателей должны начинаться с ExecutorChannel, В вашем случае вы должны изменить их все на что-то вроде этого:

public IntegrationFlow flow2() {
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele))
            .get();
}

Обратите внимание на IntegrationFlows.from(MessageChannels.executor(taskExexecutor())), Именно так вы можете сделать каждый подпоток асинхронным.

ОБНОВИТЬ

Для более старой версии Spring Integration без IntegrationFlow улучшение для подпотоков мы можем сделать так:

public IntegrationFlow flow2() {
    return integrationFlowDefinition -> integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele));
}

Это похоже на то, что вы показываете в комментарии выше.

Другие вопросы по тегам