Тайм-аут интеграции весенней партии в RemoteChunking
Я пытаюсь настроить задачу RemoteChunking, используя Spring Boot, Spring Batch и Spring Integrations.
Я настроил activeMQ
Сервер и я начинаем настраивать Spring Batch, следуя официальным документам https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html.
Мой основной конфиг:
import com.arrobaautowired.payment.Payment;
import com.arrobaautowired.record.Record;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
@Configuration
@Slf4j
@EnableBatchProcessing
public class MasterBatchConfiguration {
private final static String MASTER_JOB_TEST = "JOB_MASTER";
private final static String MATER_JOB_STEP = "STEP-1";
private final static int CHUNK_SIZE = 50;
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
private MultiResourceItemReader<Record> filesReader;
private StepListener stepListener;
@Autowired
public MasterBatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, MultiResourceItemReader<Record> filesReader, StepListener stepListener) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.filesReader = filesReader;
this.stepListener = stepListener;
}
@Bean
public Job processRecordsJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory
.get(MASTER_JOB_TEST)
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public TaskletStep step1() {
return stepBuilderFactory.get(MATER_JOB_STEP)
.<Record, Payment>chunk(CHUNK_SIZE)
.reader(filesReader)
.writer(itemWriter())
.listener(stepListener)
.build();
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
factory.setTrustAllPackages(Boolean.TRUE);
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow jmsOutboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from("requests")
.handle(Jms
.outboundAdapter(connectionFactory)
.destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter.
* Se trata de un ItemWriter especial, {@link ChunkMessageChannelItemWriter}, que se encarga de enviar la información al pooleer (Middleware externo) y recogerla.
*/
@Bean
@StepScope
public ChunkMessageChannelItemWriter<Payment> itemWriter() {
ChunkMessageChannelItemWriter<Payment> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate());
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
@Bean
public MessagingTemplate messagingTemplate(){
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
return messagingTemplate;
}
}
Конфигурация моего раба:
import com.arrobaautowired.processor.PaymentWriter;
import com.arrobaautowired.processor.ComplexRecordProcessor;
import com.arrobaautowired.processor.SimpleRecordProcessor;
import com.arrobaautowired.record.Record;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
@Configuration
@IntegrationComponentScan
@EnableIntegration
public class WorkerBatchConfiguration {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
factory.setTrustAllPackages(Boolean.TRUE);
return factory;
}
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow jmsIn() {
return IntegrationFlows
.from(Jms
.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requests"))
.channel(requests())
.get();
}
@Bean
public IntegrationFlow outgoingReplies() {
return IntegrationFlows
.from("replies")
.handle(Jms
.outboundGateway(connectionFactory())
.requestDestination("replies"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies", sendTimeout = "10000")
public ChunkProcessorChunkHandler<Record> chunkProcessorChunkHandler() {
ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler();
chunkProcessorChunkHandler.setChunkProcessor(new SimpleChunkProcessor(recordProcessor(), paymentWriter()));
return chunkProcessorChunkHandler;
}
@Bean
public SimpleRecordProcessor recordProcessor() {
return new SimpleRecordProcessor();
}
@Bean
public PaymentWriter paymentWriter() {
return new PaymentWriter();
}
}
Кажется, что все в порядке, но когда ведомое устройство завершает работу с чанком, оно отправляет ответ ведущему устройству, которое завершает работу, и ведомое устройство показывает "ошибку тайм-аута":
2018-09-17 13: 15: 21.509 DEBUG 75729 --- [erContainer#0-1] caprocessor.PaymentWriter: FINALIZADO CHUNK ============================ 2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-4] caprocessor.PaymentWriter: PAYMENT: Payment(fullName=Culver Gapper, bic=ES08 9240 0446 6617 7749 9525, сумма =75189, валюта =€) 2018-09-17 13:15:21.510 ОТЛАДКА 75729 --- [erContainer#0-1] caprocessor.PaymentWriter: PAYMENT: Payment(fullName=Burgess Feldbau, bic=ES62 6361 1904 4990 0753 3877, сумма = ноль, валюта =€) 2018-09-17 13:15:21.510 Отладчик 75729 --- [onPool-worker-0] caprocessor.PaymentWriter: PAYMENT: Оплата (полное имя = Бренда Уодделл, bic = ES23 4535 5585 5095 5691 1491, сумма = 28353, валюта =€) 2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-6] caprocessor.PaymentWriter: ОПЛАТА: Платеж (fullName = Бриттани Блибен, bic=ES88 3076 6115 5504 4561 1796, сумма =86995, валюта =€) 2018-09-17 13:15:21.510 Отладка 75729 --- [onPool-worker-0] caprocessor.PaymentWriter: PAYMENT: Payment (Payment (Payment) fullName=Hortensia Willshee, bic=ES62 7020 0819 9813 3352 2742, сумма = ноль, валюта =€) 2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-4] caprocessor.PaymentWriter: ОПЛАТА: Платеж (полное имя = Розелин Маккри, bic=ES34 5876 6541 1999 9568 8714, сумма =29865, валюта =€) 2018-09-17 13:15:21.510 ОТЛАДКА 75729 --- [erContainer#0-1] caprocessor.PaymentWriter: ОПЛАТА: Оплата (fullName = Джонатан Парлет, bic=ES74 5605 5066 6941 1376 6204, сумма = ноль, валюта =€) 2018-09-17 13:15:21.510 ОТЛАДКА 75729 --- [erContainer#0-1] caprocessor. PaymentWriter: ОПЛАТА: Оплата (fullName=Ilise Semiras, bic=ES59 4689 9344 4052 2235 5296, сумма =10698, валюта =€) 2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-5] caprocessor.PaymentWriter: PAYMENT: Payment (полное имя =Audrey Lempenny, bic=ES45 2456 6470 0023 3823 3629, сумма =56543, валюта =€) 2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-2] caprocessor.PaymentWriter: ОПЛАТА: Оплата (fullName=Hayyim Fetter, bic=ES76 5134 4202 2267 7072 2547, сумма =14662, валюта =€) 2018-09-17 13:15:21.510 ОТЛАДКА 75729 --- [erContainer#0-1] caprocessor.PaymentWriter: ==================================================== 2018-09-17 13: 15: 21.510 ОТЛАДКА 75729 --- [erContainer#0-1] osintegration.ch annel.DirectCh annel: preSend на канале "ответы", сообщение: GenericMessage [payload=ChunkResponse: jobId=1, sequence=0, stepContribution=[StepContribution: read=0, writing =10, отфильтровано =0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], успешно =true, заголовки ={jms_redelivered=true, JMSXDeliveryCount=2, jms_destination= очередь: // запросы, id=16120698-22b4-c615-502b-7c6d050d82c6, приоритет =4, jms_timestamp=1537182878228, jms_messageId=ID:mbp-de-jose.neoris.cxnetworks.net-59228-1537182877872-1:2:1:1:1, отметка времени =1537182981510 09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] osintegration.jms.JmsOutboundGateway: outgoingReplies.org.springframework.integration.jms.JmsOutboundGateway#0 получил сообщение: GenericMess age [payload = ChunkResponse: jobId = 1, sequence = 0, stepContribution = [StepContribution: чтение = 0, запись =10, фильтрация =0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], успешный =true, заголовки ={jms_redelivered=true, JMSXDeliveryCount=2, jms_destination=queue:// запросы, id = 16120698-22b4-c615-502b-7c6d050d82c6, приоритет = 4, jms_timestamp = 1537182878228, jms_messageId = jms_messageId = jms_message.neoris.cxnetworks.net-59228-1537182877872-1: 2: 1: 1: 1, отметка времени =1537182921510}] 2018-09-17 13:15:21.514 ОТЛАДКА 75729 --- [erContainer # 0-1] осинтеграция.jms.JmsOutboundGateway: ReplyTo: временная очередь://ID:mbp-de-jose.neoris.cxnetworks.net-58907-1537181494749-3:24:1 2018-09-17 13:15:26.536 WARN 75729 --- [erContainer#0-1] osjlDefaultMessageListenerContainer: сбой выполнения прослушивателя сообщений JMS, и ErrorHandler не был установлен. org.springframework.integration.MessageTimeoutException: не удалось получить ответ JMS в течение тайм-аута: 5000 мс в org.springframework.integration.jms.JmsOutboundGateway.h andleRequestMessage(JmsOutboundGateway.java:762) ~[spring-интеграции-jELE 5.0.8..jar:5.0.8.RELEASE] at org.springframework.integration.h andler.AbstractReplyProroductionMessageHandler.h andleMessageInternal(AbstractReplyProroductionMessageHandler.java:109) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8].RELE в org.springframework.integration.h andler.AbstractMessageHandler.h andleMessage(AbstractMessageHandler.java:158) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java)) ~ [spring -gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE ] в org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.ch annel.AbstractSubscribeableCh annel.doSend(AbstractSubscribeableCh annel.java:73) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.ch annel.AbstractMessageCh annel.send(AbstractMessageCh annel.jpg 445) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] по адресу org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] по org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] в org.springframework.integration.h andler.AbstractMessageProedingHandputMand:426) ~[spring-интеграция-ядро-5.0.8.RELEASE.jar: 5.0.8.RELEASE] в org.springframework.integration.h andler.AbstractMessageProroductionHandler.produceOutput(AbstractMessageProroductionHandler.java:336) ~[spring-gration- core-5.0.8.RELEASE.jar: 5.0.8.RELEASE] в org.springframework.integration.h andler.AbstractMessageProDUCHandler.sendOutputs(AbstractMessageProumingHandler.java:227) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.integration.h andler.AbstractReplyProroductionMessageHandler.h andleMessageInternal(AbstractReplyProductivityMessageHandler.java:115) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE].springframework.integration.h andler.AbstractMessageHandler.h andleMessage (Тез actMessageHandler.java:158) ~ [spring -gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] по адресу org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-gration-core-5.0.8. RELEASE.jar: 5.0.8.RELEASE] в org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.integration.ch annel.AbstractSubscribeableCh annel.doSend(AbstractSubscribeableCh annel.java:73) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.ch annel.AbstractMessageCh annel.send(AbstractMessageChannel.java:445) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.ch annel.AbstractMessageCh annel.send(AbstractMessageCh annel.java:394) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.messaging.core.GenericMessagingTemplate.doSend (GenericMessagingTemplate.java:181) ~ [spring-messaging-5.0.9.RELEASE.jar: 5.0.9.RELEASE] по адресу org.springframework.messaging.core.GenericMessagingTemplate.doSend (GenericMessagingTemplate.java:160) ~ [spring-messaging-5.0.9.RELEASE. jar: 5.0.9.RELEASE] в org.springframework.messaging.core.GenericMessagingTemplate.doSend (GenericMessagingTemplate.java:47) ~ [spring-messaging-5.0.9.RELEASE.jar: 5.0.9.RELEASE] в орг. springframework.messaging.core.AbstractMessageSendingTemplate.send (AbstractMessageSendingTemplate.java:108) ~ [spring-messaging-5.0.9.RELEASE.jar: 5.0.9.RELEASE] в org.springframework.integration.h andler.AbstractMessageOh.java:426) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.h andler.AbstractMessageProducHandler.produc eOutput (AbstractMessageProumingHandler.java:336) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.h andler.AbstractMessageProroductionHandler.sendOutputs(AbstractMessageProbingHandler.java:22: 22 [spring -gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] по адресу org.springframework.integration.h andler.AbstractReplyProroductionMessageHandler.h andleMessageInternal(AbstractReplyProroductionMessageHandler.java:115) ~ [spring-интеграция-ядро-5.0. 8.RELEASE.jar: 5.0.8.RELEASE] в org.springframework.integration.h andler.AbstractMessageHandler.h andleMessage(AbstractMessageHandler.java:158) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch (AbstractDispatcher.java:116) ~ [spring -gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-gration-core-5.0.8.RELEASE. jar: 5.0.8.RELEASE] на org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.ch annel.AbstractSubscribeableCh annel.doSend(AbstractSubscribeableCh annel.java:73) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.integration.ch annel.Ah anstractMessage.send (AbstractMessageCh annel.java:445) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.messaging.core.GenericMessagingTemplate.doSend (GenericMessagingTemplate.java:181) ~ [spring-messaging-5.0.9.RELEASE.jar: 5.0.9.RELEASE] в org.springframework.messaging.core.GenericMessagingTemplate.doSend (GenericMessagingTemplate.java:160) ~ [spring-messaging-5.0.9.RELEASE.jar: 5.0.9.RELEASE] в org.springframework.messaging.core.GenericMessagingTemplate.doSend (GenericMessagingTemplate.java:47) ~ [spring-messaging-5.0.9.RELEASE.jar: 5.0.9.RELEASE] в организации.с pringframework.messaging.core.AbstractMessageSendingTemplate.send (AbstractMessageSendingTemplate.java:108) ~ [spring-messaging-5.0.9.RELEASE.jar: 5.0.9.RELEASE] в org.springframework.messaging.core.AendingTessSageSenseTessageSage.java:150) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] в org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:142) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] по адресу org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:415) ~[spring-gration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:511) ~[spring-gration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE].springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:341) ~[spring-gration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE] в org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] в org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] в org.springfra.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] в org.springframework.jmsertainLexingLexerLex ист [spring-jms-5.0.9.RELEASE.jar: 5.0.9.RELEASE] на org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) [spring-jms-5.0.9.RELEASE.jar:5.0.9. java:1179) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] в org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076-5.0 jns.9.RELEASE.jar:5.0.9.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Код доступен в ( https://github.com/jamataran/spring-batch-scale)[https://github.com/jamataran/spring-batch-scale]
1 ответ
Я думаю, что ваша проблема на worker
сторона здесь:
@Bean
public IntegrationFlow outgoingReplies() {
return IntegrationFlows
.from("replies")
.handle(Jms
.outboundGateway(connectionFactory())
.requestDestination("replies"))
.get();
}
Вы просто отправляете ответы, и вы не получаете ничего кроме мастера.
Это должно быть односторонним
Jms
.outboundAdapter(connectionFactory())
.destination("replies")`