Spring Batch: многопоточный шаг с AsyncItemProcessor не выполняется параллельно
TL;DR
Для файла с миллионом повторений, в котором для каждой строки файла необходимо выполнить значительный объем логики, каков самый быстрый способ прочитать файл и завершить применение логики к каждой строке. Я использовал многопоточный шаг с файловым ридером,read
метод synchronized
чтобы прочитать файл, а также использовал AsynItemProcessor
так что записи обрабатываются в собственном потоке.
Я ожидаю, что AsynItemProcessor
должен начаться немедленно, как только будет обработана запись от читателя. Каждая запись должна обрабатываться в собственном потоке; однако это не похоже на мой пример ниже
У меня есть шаг в моем пакетном задании Spring, который использует TaskExecutor
с 20 потоками и интервалом фиксации 10000 для чтения файла. Я также используюAsycnItemProcessor
а также AsyncItemWriter
поскольку обработка данных иногда может занимать больше времени, чем требуется для чтения строки из файла.
<step id="aggregationStep">
<tasklet throttle-limit="20" task-executor="taskExecutor">
<chunk reader="fileReader"
processor="asyncProcessor" writer="asyncWriter"
commit-interval="10000" />
</tasklet>
</step>
Где:
fileReader
это класс, который расширяетFlatFileItemReader
иread
методsynchronized
и просто звонитsuper.read
внутри.asyncProcessor
как не что иное, какAsyncItemProcessor
bean, который передает каждую строку из файла, группирует ее по ключу и сохраняет в одноэлементном bean-компоненте, который содержитMap<String,BigDecimal>
объект. Другими словами, процессор просто группирует данные файла по нескольким столбцам и сохраняет эти данные в памяти.asyncWriter
не что иное, какAsyncItemWriter
что завершает операцию без операцииItemWriter
внутри. Другими словами, задание не требует какой-либо записи, поскольку сам процессор выполняет агрегирование и сохраняет данные в памяти. (Map
).- Обратите внимание, что
AsynItemProcessor
есть наThreadPoolTaskExecutor
сcorePoolSize=10
а такжеmaxPoolSize=20
иStep
есть свойThreadPoolTaskExecutor
сcorePoolSize=20
а такжеmaxPoolSize=40
С приведенной выше настройкой я исключал, что чтение и обработка будут происходить параллельно. Что-то типа:
- FileReader читает запись из файла и передает ее процессору
AsyncItemProcessor
выполняет агрегирование. Поскольку этоAsyncItemProcessor
, поток, который вызвалprocess
метод в идеале должен быть бесплатным для выполнения другой работы?- Наконец,
AsynItemWriter
получитFuture
и извлеките данные, но ничего не сделайте, поскольку делегат не выполняет операциюItemWriter
.
Но когда я добавил несколько журналов, я не увидел того, чего ожидал:
2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 2500 records
2019-09-07 10:04:49 INFO FileReader:45 - Finished reading 5000 records
2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 7501 records
2019-09-07 10:04:50 INFO FileReader:45 - Finished reading 10000 records
2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 12500 records
2019-09-07 10:04:51 INFO FileReader:45 - Finished reading 15000 records
... больше таких строк печатаются, пока не будет прочитан весь файл. Только после чтения файла я начинаю видеть, как процессор начинает делать свою работу:
2019-09-07 10:06:53 INFO FileProcessor:83 - Finished processing 2500 records
2019-09-07 10:08:28 INFO FileProcessor:83 - Finished processing 5000 records
2019-09-07 10:10:04 INFO FileProcessor:83 - Finished processing 7500 records
2019-09-07 10:11:40 INFO FileProcessor:83 - Finished processing 10000 records
2019-09-07 10:13:16 INFO FileProcessor:83 - Finished processing 12500 records
2019-09-07 10:14:51 INFO FileProcessor:83 - Finished processing 15000 records
Итог: почему процессор не запускается, пока файл не будет полностью прочитан? Независимо от того, чтоThreadPoolTaskExecutor
параметры, используемые для AsynItemProcessor
или для всего step
, чтение всегда завершается первым, и только потом начинается обработка.
1 ответ
Так работает обработка, ориентированная на фрагменты. Шаг будет читать X элементов в переменной (где X - интервал фиксации), затем выполняется обработка / запись. Вы можете видеть это в кодеChunkOrientedTasklet
.
В многопоточном шаге каждый фрагмент будет обрабатываться потоком.