Как запустить Spring Batch Job асинхронно

Я следовал за весенним пакетным документом и не мог выполнить свою работу Асинхронно.

Поэтому я запускаю задание из веб-контейнера, и задание будет запущено через конечную точку REST.

Я хотел получить идентификатор JobInstance, чтобы передать его в ответ, прежде чем завершить всю работу. Таким образом, они могут проверить статус задания позже с идентификатором JobInstance вместо ожидания. Но я не мог заставить это работать. Ниже приведен пример кода, который я пробовал. Пожалуйста, дайте мне знать, что я пропустил или неправильно.

BatchConfig, чтобы сделать Async JobLauncher

@Configuration
public class BatchConfig {

    @Autowired
    JobRepository jobRepository;


    @Bean
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

контроллер

@Autowired
JobLauncher jobLauncher;

@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder().
            addLong("time", System.currentTimeMillis())
            .toJobParameters();
    JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
    System.out.println(jobExecution.getJobInstance().getInstanceId());
    System.out.println("OK RESPONSE");
    return jobExecution.getJobInstance().getInstanceId();
}

И JobBuilder как компонент

@Component
public class BatchComponent {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    public Job customJob(String someParam) throws Exception {
        return jobBuilderFactory.get("personProcessor")
                .incrementer(new RunIdIncrementer()).listener(listener())
                .flow(personPorcessStep(someParam)).end().build();
    }


    private Step personPorcessStep(String someParam) throws Exception {
        return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
                .reader(new PersonReader(someParam)).faultTolerant().
                        skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
                .writer(new PersonWriter()).build();
    }


    private JobExecutionListener listener() {
        return new PersonJobCompletionListener();
    }

    private class PersonInput {
        String firstName;

        public PersonInput(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    private class PersonOutput {
        String firstName;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    public class PersonReader implements ItemReader<PersonInput> {
        private List<PersonInput> items;
        private int count = 0;

        public PersonReader(String someParam) throws InterruptedException {
            Thread.sleep(10000L); //to simulate processing
            //manipulate and provide data in the read method
            //just for testing i have given some dummy example
            items = new ArrayList<PersonInput>();
            PersonInput pi = new PersonInput("john");
            items.add(pi);
        }

        @Override
        public PersonInput read() {
            if (count < items.size()) {
                return items.get(count++);
            }
            return null;
        }
    }


    public class DataDuplicateSkipper implements SkipPolicy {

        @Override
        public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
            if (exception instanceof DataIntegrityViolationException) {
                return true;
            }
            return true;
        }
    }


    private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {

        @Override
        public PersonOutput process(PersonInput item) throws Exception {
            return null;
        }
    }

    private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
        @Override
        public void write(List<? extends PersonOutput> results) throws Exception {
            return;
        }
    }

    private class PersonJobCompletionListener implements JobExecutionListener {
        public PersonJobCompletionListener() {
        }

        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("JOB COMPLETED");
        }
    }
}

Основная функция

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

Я использую конфигурации на основе аннотаций и использую Gradle с пакетом ниже.

compile('org.springframework.boot:spring-boot-starter-batch')

Пожалуйста, дайте мне знать, если потребуется дополнительная информация. Я не смог найти ни одного примера для запуска этого общего варианта использования.

Спасибо за ваше время.

7 ответов

Попробуйте, в вашей конфигурации вам нужно создать customJobLauncher с SimpleAsyncTaskExecutor с помощью @Bean(name = "myJobLauncher"), и то же самое будет использоваться @Qualifier в вашем контроллере.

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

В вашем контроллере

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;

Если я посмотрю на ваш код, я вижу пару ошибок. Прежде всего, ваша настраиваемая конфигурация не загружается, потому что в противном случае внедрение не удастся для повторяющегося экземпляра bean-компонента для того же интерфейса.

В весенней загрузке много волшебства, но если вы не скажете ему про сканирование компонентов, ничего не будет загружено должным образом.

Вторая проблема, которую я вижу, - это ваш класс BatchConfig: он не расширяет DefaultBatchConfigure и не отменяет getJobLauncher(), поэтому, даже если магия загрузки загрузит все, вы получите значение по умолчанию. Вот конфигурация, которая будет работать, и она соответствует документации @EnableBatchProcessing API.

BatchConfig

@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {

  @Override
  @Bean
  public JobLauncher getJobLauncher() {
    try {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(getJobRepository());
      jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;

    } catch (Exception e) {
      log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
      return super.getJobLauncher();
    }
  }
}

Основная функция

@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

Хотя у тебя твой обычай jobLauncherвы выполняете задание по умолчанию jobLauncher предоставлено Spring. Не могли бы вы, пожалуйста, AutoWire simpleJobLauncher в вашем контроллере и попробовать?

Я знаю, что это старый вопрос, но я все равно отправляю его для будущих пользователей.

После просмотра вашего кода я не могу сказать, почему у вас возникла эта проблема, но я могу предложить вам использовать аннотацию Qualifier, а также использовать ThreadPoolTaskExecutor, подобный этому, и посмотреть, решит ли это вашу проблему.

Вы также можете проверить это руководство: Асинхронная обработка пакетных заданий Spring для получения дополнительных сведений. Это поможет вам настроить асинхронное пакетное задание Spring. Это руководство написано мной.

@Configuration
public class BatchConfig {

 @Autowired
 private JobRepository jobRepository;

 @Bean
 public TaskExecutor threadPoolTaskExecutor(){

  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(12);
        executor.setCorePoolSize(8);
        executor.setQueueCapacity(15);

   return executor;
 }

 @Bean
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();

        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
        return jobLauncher;
 }
}

JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);, Joblauncher будет ждать после завершения задания, прежде чем что-либо возвращать, поэтому ваш сервис, вероятно, займет много времени, чтобы ответить, если это ваша проблема. Если вы хотите асинхронные возможности, вы можете посмотреть на Spring @EnableAsync & @Async,

@EnableAsync

В соответствии с весенней документацией для возврата ответа http-запроса асинхронно необходимо использовать org.springframework.core.task.SimpleAsyncTaskExecutor.

Любая реализация весеннего интерфейса TaskExecutor может использоваться для управления асинхронным выполнением заданий.

весенняя партия документации

<bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>

Если вы используете Ломбок, это может вам помочь:

TLDR: Ломбок, похоже, плохо работает с аннотациями РЕДАКТИРОВАТЬ: если вы включили аннотации в lombok.configфайл, чтобы можно было использовать @Qualifierвот так:

      lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier

Я знаю старый вопрос, однако у меня была точно такая же проблема, и ни один из ответов не решил ее.

Я настроил средство запуска асинхронных заданий следующим образом и добавил квалификатор, чтобы убедиться, что этот jobLauncher внедряется:

       @Bean(name = "asyncJobLauncher")
 public JobLauncher simpleJobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

И ввел это так

      @Qualifier("asyncJobLauncher")
private final JobLauncher jobLauncher;

Я использовал Ломбок @AllArgsConstructorпосле изменения его на autowire была введена правильная программа запуска заданий, и теперь задание выполняется асинхронно:

      @Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;

Также мне не пришлось расширять свою конфигурацию с DefaultBatchConfigurer

Другие вопросы по тегам