Потокобезопасный SpringReatch ItemReader (шаблон индикатора процесса)
Я уже реализован Remote Chunking
используя AMQP (RabbitMQ). Теперь мне нужно запустить параллельные задания из веб-контейнера.
Мой простой контроллер (testJob
использовать дистанционное разбиение на блоки):
@Controller
public class JobController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job testJob;
@RequestMapping("/job/test")
public void test() {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addDate("date",new Date());
try {
jobLauncher.run(personJob,jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | JobInstanceAlreadyCompleteException e) {
e.printStackTrace();
}
}
}
testJob
читает данные из файловой системы (главный блок) и отправляет их на удаленный блок (подчиненный блок). Проблема в том, что ItemReader
не является потокобезопасным.
Существуют некоторые практические ограничения использования многопоточных шагов для некоторых распространенных случаев пакетного использования. Многие участники шага (например, читатели и писатели) имеют состояние, и если состояние не разделено по потокам, то эти компоненты не могут использоваться в многопоточном шаге. В частности, большинство стандартных ридеров и писателей Spring Batch не предназначены для многопоточного использования. Тем не менее, можно работать с читателями и записывающими устройствами без сохранения состояния или с поддержкой потоков, и в примерах Spring Batch есть образец (parallelJob), который демонстрирует использование индикатора процесса (см. Раздел 6.12, "Предотвращение сохранения состояния") для отслеживать элементы, которые были обработаны во входной таблице базы данных.
Я рассматриваю пример с параллельным заданием в весеннем пакетном репозитории github https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/java/org/springframework/batch/sample/common/StagingItemReader.java
Я немного запутался по поводу шаблона индикатора процесса. Где я могу найти более подробную информацию об этом шаблоне?
1 ответ
Если все, что вас беспокоит, это то, что ItemReader
Экземпляр будет разделен между вызовами работы, вы можете объявить ItemReader
в качестве шага, и вы получите новый экземпляр для каждого вызова, который устранит проблемы с потоками.
Но чтобы ответить на ваш прямой вопрос о шаблоне индикатора процесса, я не уверен, где находится хорошая документация по нему. В Spring Batch Samples есть пример его реализации (он используется в параллельном задании).
Идея заключается в том, что вы предоставляете статус записям, которые собираетесь обрабатывать. В начале задания / шага вы отмечаете эти записи как находящиеся в процессе. Когда записи зафиксированы, вы помечаете их как обработанные. Это устраняет необходимость отслеживать состояние в считывателе, поскольку ваше состояние фактически находится в БД (ваш запрос ищет только записи, отмеченные как находящиеся в процессе обработки).