Как запустить Spring Batch Job асинхронно
Я следовал за весенним пакетным документом и не мог выполнить свою работу Асинхронно.
Поэтому я запускаю задание из веб-контейнера, и задание будет запущено через конечную точку REST.
Я хотел получить идентификатор JobInstance, чтобы передать его в ответ, прежде чем завершить всю работу. Таким образом, они могут проверить статус задания позже с идентификатором JobInstance вместо ожидания. Но я не мог заставить это работать. Ниже приведен пример кода, который я пробовал. Пожалуйста, дайте мне знать, что я пропустил или неправильно.
BatchConfig, чтобы сделать Async JobLauncher
@Configuration
public class BatchConfig {
@Autowired
JobRepository jobRepository;
@Bean
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
контроллер
@Autowired
JobLauncher jobLauncher;
@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().
addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
System.out.println(jobExecution.getJobInstance().getInstanceId());
System.out.println("OK RESPONSE");
return jobExecution.getJobInstance().getInstanceId();
}
И JobBuilder как компонент
@Component
public class BatchComponent {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
public Job customJob(String someParam) throws Exception {
return jobBuilderFactory.get("personProcessor")
.incrementer(new RunIdIncrementer()).listener(listener())
.flow(personPorcessStep(someParam)).end().build();
}
private Step personPorcessStep(String someParam) throws Exception {
return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
.reader(new PersonReader(someParam)).faultTolerant().
skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
.writer(new PersonWriter()).build();
}
private JobExecutionListener listener() {
return new PersonJobCompletionListener();
}
private class PersonInput {
String firstName;
public PersonInput(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
private class PersonOutput {
String firstName;
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
public class PersonReader implements ItemReader<PersonInput> {
private List<PersonInput> items;
private int count = 0;
public PersonReader(String someParam) throws InterruptedException {
Thread.sleep(10000L); //to simulate processing
//manipulate and provide data in the read method
//just for testing i have given some dummy example
items = new ArrayList<PersonInput>();
PersonInput pi = new PersonInput("john");
items.add(pi);
}
@Override
public PersonInput read() {
if (count < items.size()) {
return items.get(count++);
}
return null;
}
}
public class DataDuplicateSkipper implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
if (exception instanceof DataIntegrityViolationException) {
return true;
}
return true;
}
}
private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {
@Override
public PersonOutput process(PersonInput item) throws Exception {
return null;
}
}
private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
@Override
public void write(List<? extends PersonOutput> results) throws Exception {
return;
}
}
private class PersonJobCompletionListener implements JobExecutionListener {
public PersonJobCompletionListener() {
}
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("JOB COMPLETED");
}
}
}
Основная функция
@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
Я использую конфигурации на основе аннотаций и использую Gradle с пакетом ниже.
compile('org.springframework.boot:spring-boot-starter-batch')
Пожалуйста, дайте мне знать, если потребуется дополнительная информация. Я не смог найти ни одного примера для запуска этого общего варианта использования.
Спасибо за ваше время.
7 ответов
Попробуйте, в вашей конфигурации вам нужно создать customJobLauncher с SimpleAsyncTaskExecutor с помощью @Bean(name = "myJobLauncher"), и то же самое будет использоваться @Qualifier в вашем контроллере.
@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
В вашем контроллере
@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;
Если я посмотрю на ваш код, я вижу пару ошибок. Прежде всего, ваша настраиваемая конфигурация не загружается, потому что в противном случае внедрение не удастся для повторяющегося экземпляра bean-компонента для того же интерфейса.
В весенней загрузке много волшебства, но если вы не скажете ему про сканирование компонентов, ничего не будет загружено должным образом.
Вторая проблема, которую я вижу, - это ваш класс BatchConfig: он не расширяет DefaultBatchConfigure и не отменяет getJobLauncher(), поэтому, даже если магия загрузки загрузит все, вы получите значение по умолчанию. Вот конфигурация, которая будет работать, и она соответствует документации @EnableBatchProcessing API.
BatchConfig
@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {
@Override
@Bean
public JobLauncher getJobLauncher() {
try {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
} catch (Exception e) {
log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
return super.getJobLauncher();
}
}
}
Основная функция
@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
Хотя у тебя твой обычай jobLauncher
вы выполняете задание по умолчанию jobLauncher
предоставлено Spring. Не могли бы вы, пожалуйста, AutoWire simpleJobLauncher
в вашем контроллере и попробовать?
Я знаю, что это старый вопрос, но я все равно отправляю его для будущих пользователей.
После просмотра вашего кода я не могу сказать, почему у вас возникла эта проблема, но я могу предложить вам использовать аннотацию Qualifier, а также использовать ThreadPoolTaskExecutor, подобный этому, и посмотреть, решит ли это вашу проблему.
Вы также можете проверить это руководство: Асинхронная обработка пакетных заданий Spring для получения дополнительных сведений. Это поможет вам настроить асинхронное пакетное задание Spring. Это руководство написано мной.
@Configuration
public class BatchConfig {
@Autowired
private JobRepository jobRepository;
@Bean
public TaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(12);
executor.setCorePoolSize(8);
executor.setQueueCapacity(15);
return executor;
}
@Bean
public JobLauncher asyncJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
return jobLauncher;
}
}
JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
, Joblauncher будет ждать после завершения задания, прежде чем что-либо возвращать, поэтому ваш сервис, вероятно, займет много времени, чтобы ответить, если это ваша проблема. Если вы хотите асинхронные возможности, вы можете посмотреть на Spring @EnableAsync
& @Async
,
В соответствии с весенней документацией для возврата ответа http-запроса асинхронно необходимо использовать org.springframework.core.task.SimpleAsyncTaskExecutor.
Любая реализация весеннего интерфейса TaskExecutor может использоваться для управления асинхронным выполнением заданий.
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>
Если вы используете Ломбок, это может вам помочь:
TLDR: Ломбок, похоже, плохо работает с аннотациями РЕДАКТИРОВАТЬ: если вы включили аннотации в
lombok.config
файл, чтобы можно было использовать
@Qualifier
вот так:
lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
Я знаю старый вопрос, однако у меня была точно такая же проблема, и ни один из ответов не решил ее.
Я настроил средство запуска асинхронных заданий следующим образом и добавил квалификатор, чтобы убедиться, что этот jobLauncher внедряется:
@Bean(name = "asyncJobLauncher")
public JobLauncher simpleJobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
И ввел это так
@Qualifier("asyncJobLauncher")
private final JobLauncher jobLauncher;
Я использовал Ломбок
@AllArgsConstructor
после изменения его на autowire была введена правильная программа запуска заданий, и теперь задание выполняется асинхронно:
@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;
Также мне не пришлось расширять свою конфигурацию с
DefaultBatchConfigurer