Весенняя интеграция DSL Scatter-Gather асинхронного / параллельного выполнения для нескольких receientFlow
Мы пытаемся делать параллельные звонки разным получателям, используя scatter-collect, и это работает нормально. Но второй поток получателей не запускается, если первый не завершен (прослеживается в Zipkin). есть способ сделать всех получателей асинхронными.. очень похожими на split-aggregate с каналом executor.
public IntegrationFlow flow1() {
return flow -> flow
.split().channel(c -> c.executor(Executors.newCachedThreadPool()))
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(flow2())
.recipientFlow(flow3())
.recipientFlow(flow4())
.recipientFlow(flow5()),
gatherer -> gatherer
.outputProcessor(messageGroup -> {
Object request = gatherResponse(messageGroup);
return createResponse(request);
}))
.aggregate();
}
Методы flow2(),flow3(),flow4() являются методами с InterationFlow
как тип возврата.
образец кода flow2()
:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
1 ответ
Это действительно возможно с указанным executor channel
, Все потоки получателей должны начинаться с ExecutorChannel
, В вашем случае вы должны изменить их все на что-то вроде этого:
public IntegrationFlow flow2() {
return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele))
.get();
}
Обратите внимание на IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
, Именно так вы можете сделать каждый подпоток асинхронным.
ОБНОВИТЬ
Для более старой версии Spring Integration без IntegrationFlow
улучшение для подпотоков мы можем сделать так:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
Это похоже на то, что вы показываете в комментарии выше.