Чтобы отделить класс шагов в весенней партии

Я пытался найти решение, но не могу... ã… ã…

Я хочу разделить этапы работы, как показано ниже.

step1.class -> step2.class -> step3.class -> done

Причина, по которой я так разделен, заключается в том, что мне приходится использовать запросы на каждом этапе.

    @Bean
    public Job bundleJob() {
        return jobBuilderFactory.get(JOB_NAME)
                .start(step1) // bean 
                .next(step2) // bean
                .next(step3()) // and here is the code ex) reader, processor, writer
                .build();
    }

Моя цель - использовать возвращаемые данные на шаге 1, шаг 2. но jpaItemReader похож на async ... поэтому он не обрабатывается, как указано выше.
поток отладки, подобный этому.

readerStep1 -> writerStep1 -> readerStep2 -> readerWriter2 -> readerStep3 -> writerStep3 
and
-> processorStep1 -> processorStep2 -> processorStep3

это большая проблема для меня...
Как я могу дождаться каждого шага в работе? Включая запросы.

2 ответа

Ага! Я понял.
Дело в создании bean-компонентов в конфигурации.
Я написал аннотационный bean-компонент для всевозможных шагов, чтобы они были созданы весной.

решение является поздним связыванием, как @JobScope или @StepScope

    @Bean
    @StepScope. // late creating bean.
    public ListItemReader<Dto> itemReader() {
        // business logic
        return new ListItemReader<>(dto);
    }

Чтобы иметь отдельные шаги в вашей работе, вы можете использовать Flow с TaskletStep. Поделившись фрагментом для справки,

@Bean
public Job processJob() throws Exception {

    Flow fetchData = (Flow) new FlowBuilder<>("fetchData")
            .start(fetchDataStep()).build();

    Flow transformData = (Flow) new FlowBuilder<>("transformData")
            .start(transformData()).build();

    Job job = jobBuilderFactory.get("processTenantLifeCycleJob").incrementer(new RunIdIncrementer())
    .start(fetchData).next(transformData).next(processData()).end()
    .listener(jobCompletionListener()).build();

    ReferenceJobFactory referenceJobFactory = new ReferenceJobFactory(job);
    registry.register(referenceJobFactory);

    return job;
}


@Bean
public TaskletStep fetchDataStep() {
    return stepBuilderFactory.get("fetchData")
            .tasklet(fetchDataValue()).listener(fetchDataStepListener()).build();
}

@Bean
@StepScope
public FetchDataValue fetchDataValue() {
    return new FetchDataValue();
}

@Bean
public TaskletStep transformDataStep() {
    return stepBuilderFactory.get("transformData")   
    .tasklet(transformValue()).listener(sendReportDataCompletionListener()).build();
}

@Bean
@StepScope
public TransformValue transformValue() {
    return new TransformValue();
}

@Bean
public Step processData() {
    return stepBuilderFactory.get("processData").<String, Data>chunk(chunkSize)
            .reader(processDataReader()).processor(dataProcessor()).writer(processDataWriter())
            .listener(processDataListener())
            .taskExecutor(backupTaskExecutor()).build();
}

В этом примере я использовал 2 потока для выборки и преобразования данных, которые будут выполнять данные из класса.

Чтобы вернуть значения из шагов 1 и 2, вы можете сохранить значение в контексте задания и получить его на шаге ProcessData, который имеет считыватель, процессор и модуль записи.

Другие вопросы по тегам