весенняя партия застревает в параллельной обработке, где отлично работает при последовательной обработке

Я новичок в Spring Batch и пытался запустить Spring Batch с одним потоком. Теперь мне нужно добавить многопоточность по шагам и настроить конфигурацию ниже, но параллельная обработка через некоторое время зависает, и после обработки некоторых записей на консоли не остается никаких следов. Раньше для однопоточного чтения я использовал JdbcCursorItemReader, а затем переключился на JdbcPagingItemReader для поточно-безопасного чтения. Читатель читает записи из базы данных postgres, а затем процессор (который вызывает другой остальной веб-сервис и возвращает ответ писателю) и писатель (который создает новый файл и обновляет данные статуса в БД) могут выполняться параллельно.

          @Bean
  public Job job(JobBuilderFactory jobBuilderFactory,
      StepBuilderFactory stepBuilderFactory,
      ItemReader<OrderRequest> itemReader,
      ItemProcessor<OrderRequest, OrderResponse> dataProcessor,
      ItemWriter<OrderResponse> fileWriter, JobExecutionListener jobListener,
      ItemReadListener<OrderRequest> stepItemReadListener,
      SkipListener<OrderRequest, OrderResponse> stepSkipListener, TaskExecutor taskExecutor) {


    Step step1 = stepBuilderFactory.get("Process-Data")
        .<OrderRequest, OrderResponse>chunk(10)
        .listener(stepItemReadListener)
        .reader(itemReader)
        .processor(dataProcessor)
        .writer(fileWriter)
        .faultTolerant()
        .processorNonTransactional()
        .skipLimit(5)
        .skip(CustomException.class)
        .listener(stepSkipListener)
        .taskExecutor(taskExecutor)
        .throttleLimit(5)
        .build();

    return jobBuilderFactory.get("Batch-Job")
        .incrementer(new RunIdIncrementer())
        .listener(jobListener)
        .start(step1)
        .build();
  }
  
  @StepScope
  @Bean
  public JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
      @Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {

    // reading database records using JDBC in a paging fashion

    JdbcPagingItemReader<OrderRequest> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(rowMapper);

    // Sort Keys
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("OrderRequestID", Order.ASCENDING);

    // Postgres implementation of a PagingQueryProvider using database specific features.

    PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
    queryProvider.setSelectClause("*");
    queryProvider.setFromClause("FROM OrderRequest");
    queryProvider.setWhereClause("CUSTOMER = '" + customerId + "'");
    queryProvider.setSortKeys(sortKeys);
    reader.setQueryProvider(queryProvider);
    return reader;
  }
 
 @StepScope
  @Bean
  public SynchronizedItemStreamReader<OrderRequest> itemReader(JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader) {
    return new SynchronizedItemStreamReaderBuilder<OrderRequest>().delegate(jdbcPagingItemReader).build();
  }
  
  @Bean
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(5);
    taskExecutor.setQueueCapacity(0);
    return taskExecutor;
  }

  @StepScope
  @Bean
  ItemProcessor<OrderRequest, OrderResponse> dataProcessor() {
    return new BatchDataFileProcessor();
  }

  @StepScope
  @Bean
  ItemWriter<OrderResponse> fileWriter() {
    return new BatchOrderFileWriter();
  }


  @StepScope
  @Bean
  public ItemReadListener<OrderRequest> stepItemReadListener() {
    return new StepItemReadListener();
  }


  @Bean
  public JobExecutionListener jobListener() {
    return new JobListener();
  }

  @StepScope
  @Bean
  public SkipListener<OrderRequest, OrderResponse> stepSkipListener() {
    return new StepSkipListener();
  }

В чем проблема с конфигурацией многопоточности? Пакетная обработка отлично работает с одной записью одновременно при использовании JdbcCursorItemReader и без компонента TaskExecutor:

      @StepScope
  @Bean
  public JdbcCursorItemReader<OrderRequest> jdbcCursorItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
      @Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {

    return new JdbcCursorItemReaderBuilder<OrderRequest>()
        .name("jdbcCursorItemReader")
        .dataSource(dataSource)
        .queryArguments(customerId)
        .sql(CommonConstant.FETCH_QUERY)
        .rowMapper(rowMapper)
        .saveState(true)
        .build();
  }

1 ответ

После изменения TaskExecutor теперь он работает следующим образом:

      @Bean
  public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(concurrencyLimit);
    return taskExecutor;
  }

Не понял, с чем была проблема раньше.

Другие вопросы по тегам