Потокобезопасный SpringReatch ItemReader (шаблон индикатора процесса)

Я уже реализован Remote Chunking используя AMQP (RabbitMQ). Теперь мне нужно запустить параллельные задания из веб-контейнера.

Мой простой контроллер (testJob использовать дистанционное разбиение на блоки):

@Controller
public class JobController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job testJob;

    @RequestMapping("/job/test")
    public void test() {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addDate("date",new Date());
        try {
            jobLauncher.run(personJob,jobParametersBuilder.toJobParameters());
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        }

    }

}

testJob читает данные из файловой системы (главный блок) и отправляет их на удаленный блок (подчиненный блок). Проблема в том, что ItemReader не является потокобезопасным.

Существуют некоторые практические ограничения использования многопоточных шагов для некоторых распространенных случаев пакетного использования. Многие участники шага (например, читатели и писатели) имеют состояние, и если состояние не разделено по потокам, то эти компоненты не могут использоваться в многопоточном шаге. В частности, большинство стандартных ридеров и писателей Spring Batch не предназначены для многопоточного использования. Тем не менее, можно работать с читателями и записывающими устройствами без сохранения состояния или с поддержкой потоков, и в примерах Spring Batch есть образец (parallelJob), который демонстрирует использование индикатора процесса (см. Раздел 6.12, "Предотвращение сохранения состояния") для отслеживать элементы, которые были обработаны во входной таблице базы данных.

Я рассматриваю пример с параллельным заданием в весеннем пакетном репозитории github https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/java/org/springframework/batch/sample/common/StagingItemReader.java

Я немного запутался по поводу шаблона индикатора процесса. Где я могу найти более подробную информацию об этом шаблоне?

1 ответ

Решение

Если все, что вас беспокоит, это то, что ItemReader Экземпляр будет разделен между вызовами работы, вы можете объявить ItemReader в качестве шага, и вы получите новый экземпляр для каждого вызова, который устранит проблемы с потоками.

Но чтобы ответить на ваш прямой вопрос о шаблоне индикатора процесса, я не уверен, где находится хорошая документация по нему. В Spring Batch Samples есть пример его реализации (он используется в параллельном задании).

Идея заключается в том, что вы предоставляете статус записям, которые собираетесь обрабатывать. В начале задания / шага вы отмечаете эти записи как находящиеся в процессе. Когда записи зафиксированы, вы помечаете их как обработанные. Это устраняет необходимость отслеживать состояние в считывателе, поскольку ваше состояние фактически находится в БД (ваш запрос ищет только записи, отмеченные как находящиеся в процессе обработки).

Другие вопросы по тегам