Весенняя партия пишет в ActiveMQ
Я пытаюсь записать сообщения в очередь JMS, которые я вывожу на следующем шаге для записи в базу данных. Первая часть должна быть синхронизирована во второй асинхронной. Часть JMS очень медленная (1100 элементов в очереди за 1 минуту).
Так выглядит моя работа.
@Bean
public Job multiThreadedStepJob() {
Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end();
Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
Flow splitFlow = new FlowBuilder<Flow>("splitflow")
.split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();
return jobBuilders.get("multiThreadedStepJob")
.start(splitFlow).end().build();
}
Первый шаг:
@Bean
public Step step() {
return stepBuilders.get("step")
.<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
.reader(reader())
.writer(writer())
.build();
}
второй шаг:
@Bean
public Step step2() {
return stepBuilders.get("step2")
.<OrderDTO, OrderDTO>chunk(100)
.reader(reader2())
.writer(writer2())
.build();
}
Я думаю, что мои ошибки находятся внутри пишущего устройства step и reader of step2, потому что я могу запустить другого читателя и writer вместе, и у меня нет проблем.
@Bean
public JmsItemWriter<OrderDTO> writer() {
JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemWriter;
}
@Bean
public JmsItemReader<OrderDTO> reader2() {
JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
itemReader.setItemType(OrderDTO.class);
return itemReader;
}
Они используют один и тот же JmsTemplate для подключения к очереди:
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
}
@Bean
public Queue queue() {
return new ActiveMQQueue("orderList");
}
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setTrustAllPackages(true);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(30);
factory.setPrefetchPolicy(prefetchPolicy);
PooledConnectionFactory pool = new PooledConnectionFactory(factory);
pool.setMaxConnections(10);
pool.setMaximumActiveSessionPerConnection(10);
pool.isCreateConnectionOnStartup();
return pool;
}
Остальная часть конфигурации, которую я использую, является конфигурацией из @EnableBatchProcessing. Кто-нибудь знает, почему это так медленно?
1 ответ
По-видимому, jmsTemplate.setSessionTransacted(true); действительно важно. Это значительно ускорило запись и чтение из очереди JMS. По какой-то причине я думал, что по умолчанию будет верно, потому что я работаю с партиями.
В любом случае, если у кого-то еще есть эта проблема, проверьте это сначала, потому что это легко забыть.