весенняя партия застревает в параллельной обработке, где отлично работает при последовательной обработке
Я новичок в Spring Batch и пытался запустить Spring Batch с одним потоком. Теперь мне нужно добавить многопоточность по шагам и настроить конфигурацию ниже, но параллельная обработка через некоторое время зависает, и после обработки некоторых записей на консоли не остается никаких следов. Раньше для однопоточного чтения я использовал JdbcCursorItemReader, а затем переключился на JdbcPagingItemReader для поточно-безопасного чтения. Читатель читает записи из базы данных postgres, а затем процессор (который вызывает другой остальной веб-сервис и возвращает ответ писателю) и писатель (который создает новый файл и обновляет данные статуса в БД) могут выполняться параллельно.
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
ItemReader<OrderRequest> itemReader,
ItemProcessor<OrderRequest, OrderResponse> dataProcessor,
ItemWriter<OrderResponse> fileWriter, JobExecutionListener jobListener,
ItemReadListener<OrderRequest> stepItemReadListener,
SkipListener<OrderRequest, OrderResponse> stepSkipListener, TaskExecutor taskExecutor) {
Step step1 = stepBuilderFactory.get("Process-Data")
.<OrderRequest, OrderResponse>chunk(10)
.listener(stepItemReadListener)
.reader(itemReader)
.processor(dataProcessor)
.writer(fileWriter)
.faultTolerant()
.processorNonTransactional()
.skipLimit(5)
.skip(CustomException.class)
.listener(stepSkipListener)
.taskExecutor(taskExecutor)
.throttleLimit(5)
.build();
return jobBuilderFactory.get("Batch-Job")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.start(step1)
.build();
}
@StepScope
@Bean
public JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
@Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<OrderRequest> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(rowMapper);
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("OrderRequestID", Order.ASCENDING);
// Postgres implementation of a PagingQueryProvider using database specific features.
PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("FROM OrderRequest");
queryProvider.setWhereClause("CUSTOMER = '" + customerId + "'");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@StepScope
@Bean
public SynchronizedItemStreamReader<OrderRequest> itemReader(JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader) {
return new SynchronizedItemStreamReaderBuilder<OrderRequest>().delegate(jdbcPagingItemReader).build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(5);
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@StepScope
@Bean
ItemProcessor<OrderRequest, OrderResponse> dataProcessor() {
return new BatchDataFileProcessor();
}
@StepScope
@Bean
ItemWriter<OrderResponse> fileWriter() {
return new BatchOrderFileWriter();
}
@StepScope
@Bean
public ItemReadListener<OrderRequest> stepItemReadListener() {
return new StepItemReadListener();
}
@Bean
public JobExecutionListener jobListener() {
return new JobListener();
}
@StepScope
@Bean
public SkipListener<OrderRequest, OrderResponse> stepSkipListener() {
return new StepSkipListener();
}
В чем проблема с конфигурацией многопоточности? Пакетная обработка отлично работает с одной записью одновременно при использовании JdbcCursorItemReader и без компонента TaskExecutor:
@StepScope
@Bean
public JdbcCursorItemReader<OrderRequest> jdbcCursorItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
@Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {
return new JdbcCursorItemReaderBuilder<OrderRequest>()
.name("jdbcCursorItemReader")
.dataSource(dataSource)
.queryArguments(customerId)
.sql(CommonConstant.FETCH_QUERY)
.rowMapper(rowMapper)
.saveState(true)
.build();
}
1 ответ
После изменения TaskExecutor теперь он работает следующим образом:
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(concurrencyLimit);
return taskExecutor;
}
Не понял, с чем была проблема раньше.