Как вызвать Spring Spring из Spring интеграции Java Java DSL файл интеграции файлов интеграции потоков

Как запустить пакетное задание пружины из интеграции пружин, используя java dsl Integrationflows.

У меня есть приведенный ниже код, который опрашивает файл в каталоге, в тот момент, когда в него добавляется новый файл, генерируется сообщение, и я хочу запустить пакетное задание Spring в этом экземпляре. Пожалуйста посоветуй.

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .transform(Transformers.fileToString())
                     .channel(ApplicationConfiguration.INBOUND_CHANNEL)                 

             .get();
}

2 ответа

Ниже мой код:-

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource,
                                              JobLaunchingGateway jobLaunchingGateway) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .handle(fileMessageToJobRequest(),"toRequest")
                       .handle(jobLaunchingGateway)
                        .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                             .get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
  //  fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
  //  simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

В Справочном руководстве Spring Batch есть чистый образец по этому вопросу:

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
Другие вопросы по тегам