Могу ли я использовать FlatfileItemReader с Taskexecutor?

Могу ли я использовать FlatfileItemReader с Taskexecutor в весенней партии??

Я реализовал FlatFileItemReader с ThreadPoolTaskExecutor. Когда я печатаю записи в ItemProcessor, я не получаю согласованных результатов, т.е. не все записи печатаются, а иногда одна из записей печатается более одного раза. Это приводит меня к тому факту, что FlatFileItemReader не является потокобезопасным, и также он говорит то же самое в весенних документах, но я вижу некоторые блоги, где говорится, что можно использовать FlatFileItemReader с Task Executor.

Итак, мой вопрос: можно ли вообще использовать FlatfileItemReader с Task Executor?

    @Bean
    @StepScope
    public FlatFileItemReader<DataLifeCycleEvent> csvFileReader(
            @Value("#{stepExecution}") StepExecution stepExecution) {

        Resource inputResource;
        FlatFileItemReader<DataLifeCycleEvent> itemReader = new FlatFileItemReader<>();

        itemReader.setLineMapper(new OnboardingLineMapper(stepExecution));
        itemReader.setLinesToSkip(1);
        itemReader.setSaveState(false);
        itemReader.setSkippedLinesCallback(new OnboardingHeaderMapper(stepExecution));
        String inputResourceString = stepExecution.getJobParameters().getString("inputResource");
        inputResource = new FileSystemResource(inputFileLocation + ApplicationConstant.SLASH + inputResourceString);
        itemReader.setResource(inputResource);
        stepExecution.getJobExecution().getExecutionContext().putInt(ApplicationConstant.ERROR_COUNT, 0);
        return itemReader;
    }

2 ответа

Решение

FlatFileItemReader расширяет AbstractItemCountingItemStreamItemReaderкоторый НЕ является потокобезопасным. Поэтому, если вы используете его в многопоточном режиме, вам необходимо синхронизировать его.

Вы можете завернуть его в SynchronizedItemStreamReader. Вот краткий пример:

@Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
    FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader

    SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
    synchronizedItemStreamReader.setDelegate(itemReader);
    return synchronizedItemStreamReader;
}

Этот метод дает следующее исключение:-java.lang.ClassCastException: com.sun.proxy.$Proxy344 не может быть преобразован в org.springframework.batch.item.support.SynchronizedItemStreamReader

      @Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
    FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader

    SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
    synchronizedItemStreamReader.setDelegate(itemReader);
    return synchronizedItemStreamReader;
}
Другие вопросы по тегам