Весенняя партия пишет в ActiveMQ

Я пытаюсь записать сообщения в очередь JMS, которые я вывожу на следующем шаге для записи в базу данных. Первая часть должна быть синхронизирована во второй асинхронной. Часть JMS очень медленная (1100 элементов в очереди за 1 минуту).

Так выглядит моя работа.

@Bean
public Job multiThreadedStepJob() { 
   Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end(); 
   Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
   Flow splitFlow = new FlowBuilder<Flow>("splitflow")
   .split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();

   return jobBuilders.get("multiThreadedStepJob")
                          .start(splitFlow).end().build();

}

Первый шаг:

@Bean
public Step step() {
     return stepBuilders.get("step")
         .<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
         .reader(reader())
         .writer(writer())   
         .build();
}

второй шаг:

@Bean
public Step step2() {
    return stepBuilders.get("step2")
            .<OrderDTO, OrderDTO>chunk(100)
            .reader(reader2())
            .writer(writer2())
            .build();
}

Я думаю, что мои ошибки находятся внутри пишущего устройства step и reader of step2, потому что я могу запустить другого читателя и writer вместе, и у меня нет проблем.

@Bean
public JmsItemWriter<OrderDTO> writer() {
    JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
    itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
    return itemWriter;
}

@Bean
public JmsItemReader<OrderDTO> reader2() {
    JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
    itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
    itemReader.setItemType(OrderDTO.class);
    return itemReader;
}

Они используют один и тот же JmsTemplate для подключения к очереди:

@Bean
public JmsTemplate jmsTemplate() {
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
    jmsTemplate.setDefaultDestination(queue());
    jmsTemplate.setReceiveTimeout(500);
    return jmsTemplate;
}

@Bean
public Queue queue() {
    return new ActiveMQQueue("orderList");
}

@Bean
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
    factory.setTrustAllPackages(true);

    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
    prefetchPolicy.setQueuePrefetch(30);

    factory.setPrefetchPolicy(prefetchPolicy);

    PooledConnectionFactory pool = new PooledConnectionFactory(factory);
    pool.setMaxConnections(10);
    pool.setMaximumActiveSessionPerConnection(10);
    pool.isCreateConnectionOnStartup();

    return pool;
}

Остальная часть конфигурации, которую я использую, является конфигурацией из @EnableBatchProcessing. Кто-нибудь знает, почему это так медленно?

1 ответ

По-видимому, jmsTemplate.setSessionTransacted(true); действительно важно. Это значительно ускорило запись и чтение из очереди JMS. По какой-то причине я думал, что по умолчанию будет верно, потому что я работаю с партиями.

В любом случае, если у кого-то еще есть эта проблема, проверьте это сначала, потому что это легко забыть.

Другие вопросы по тегам