Spring Batch: многопоточный шаг с AsyncItemProcessor не выполняется параллельно

TL;DR

Для файла с миллионом повторений, в котором для каждой строки файла необходимо выполнить значительный объем логики, каков самый быстрый способ прочитать файл и завершить применение логики к каждой строке. Я использовал многопоточный шаг с файловым ридером,read метод synchronized чтобы прочитать файл, а также использовал AsynItemProcessor так что записи обрабатываются в собственном потоке.

Я ожидаю, что AsynItemProcessorдолжен начаться немедленно, как только будет обработана запись от читателя. Каждая запись должна обрабатываться в собственном потоке; однако это не похоже на мой пример ниже


У меня есть шаг в моем пакетном задании Spring, который использует TaskExecutorс 20 потоками и интервалом фиксации 10000 для чтения файла. Я также используюAsycnItemProcessor а также AsyncItemWriter поскольку обработка данных иногда может занимать больше времени, чем требуется для чтения строки из файла.

<step id="aggregationStep">
    <tasklet throttle-limit="20" task-executor="taskExecutor">
        <chunk reader="fileReader"
            processor="asyncProcessor" writer="asyncWriter"
            commit-interval="10000" />
    </tasklet>
</step>

Где:

  1. fileReader это класс, который расширяет FlatFileItemReader и read метод synchronized и просто звонит super.read внутри.
  2. asyncProcessor как не что иное, как AsyncItemProcessor bean, который передает каждую строку из файла, группирует ее по ключу и сохраняет в одноэлементном bean-компоненте, который содержит Map<String,BigDecimal>объект. Другими словами, процессор просто группирует данные файла по нескольким столбцам и сохраняет эти данные в памяти.
  3. asyncWriter не что иное, как AsyncItemWriter что завершает операцию без операции ItemWriterвнутри. Другими словами, задание не требует какой-либо записи, поскольку сам процессор выполняет агрегирование и сохраняет данные в памяти. (Map).
  4. Обратите внимание, что AsynItemProcessor есть на ThreadPoolTaskExecutor с corePoolSize=10 а также maxPoolSize=20 и Step есть свой ThreadPoolTaskExecutor с corePoolSize=20 а также maxPoolSize=40

С приведенной выше настройкой я исключал, что чтение и обработка будут происходить параллельно. Что-то типа:

  1. FileReader читает запись из файла и передает ее процессору
  2. AsyncItemProcessorвыполняет агрегирование. Поскольку этоAsyncItemProcessor, поток, который вызвал process метод в идеале должен быть бесплатным для выполнения другой работы?
  3. Наконец, AsynItemWriter получит Future и извлеките данные, но ничего не сделайте, поскольку делегат не выполняет операцию ItemWriter.

Но когда я добавил несколько журналов, я не увидел того, чего ожидал:

2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 2500 records 2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 5000 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 7501 records 2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 10000 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 12500 records 2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 15000 records

... больше таких строк печатаются, пока не будет прочитан весь файл. Только после чтения файла я начинаю видеть, как процессор начинает делать свою работу:

2019-09-07 10:06:53 INFO FileProcessor:83 - Finished processing 2500 records 2019-09-07 10:08:28 INFO FileProcessor:83 - Finished processing 5000 records 2019-09-07 10:10:04 INFO FileProcessor:83 - Finished processing 7500 records 2019-09-07 10:11:40 INFO FileProcessor:83 - Finished processing 10000 records 2019-09-07 10:13:16 INFO FileProcessor:83 - Finished processing 12500 records 2019-09-07 10:14:51 INFO FileProcessor:83 - Finished processing 15000 records

Итог: почему процессор не запускается, пока файл не будет полностью прочитан? Независимо от того, чтоThreadPoolTaskExecutor параметры, используемые для AsynItemProcessor или для всего step, чтение всегда завершается первым, и только потом начинается обработка.

1 ответ

Так работает обработка, ориентированная на фрагменты. Шаг будет читать X элементов в переменной (где X - интервал фиксации), затем выполняется обработка / запись. Вы можете видеть это в кодеChunkOrientedTasklet.

В многопоточном шаге каждый фрагмент будет обрабатываться потоком.

Другие вопросы по тегам