Spring Integration Java DSL flow Splitter/Aggregator удаляет файл после обработки всех строк

Используя Spring Integration Java DSL, я создал поток, в котором я обрабатываю файлы синхронно с FileSplitter, Я был в состоянии использовать setDeleteFiles флаг на AbstractFilePayloadTransformer удалить файл после преобразования каждой строки в File к Message для последующей обработки, вот так:

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // do not exhaust filesystem w/ files downloaded from S3
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);

    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .channel(StatsUtil.createRunStatsChannel(runStatsRepository))
        .transform(transformer)
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

Это работает хорошо, но медленно. Поэтому я пытаюсь добавить ExecutorChannel после .split выше, вот так:

.channel(c -> c.executor(Executors.newFixedThreadPool(10)))

Но тогда вышеупомянутый флаг удаления не позволяет потоку успешно завершить удаление файла (ов) до того, как они будут полностью прочитаны.

Если я уберу флаг, у меня будет возможность исчерпать локальную файловую систему, где файлы были синхронизированы с S3.

Что я мог бы представить выше, чтобы а) полностью обработать каждый файл и б) удалить файл из локальной файловой системы после завершения? Другими словами, есть ли способ узнать точно, когда файл полностью обрабатывается (когда его строки обрабатываются асинхронно через потоки в пуле)?

Если вам интересно, вот мой вывод FileToInputStreamTransformer:

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

    @Override
    // @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
    }
}

ОБНОВИТЬ

Итак, как что-то в нисходящем потоке знает, что просить?

Кстати, если я правильно следую вашему совету, когда я обновляю .split с new FileSplitter(true, true) выше я получаю

2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: не удалось преобразовать сообщение; вложенное исключение - java.lang.IllegalArgumentException: аргумент 'json' должен быть экземпляром: [class java.lang.String, class [B, класс java.io.File, класс java.net.URL, класс java.io.InputStream, класс java.io.Reader], но получил: класс org.springframework.integration.file.splitter.FileSplitter$FileMarker
    в org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)

2 ответа

Решение

Спасибо Артем.

Мне удалось решить проблему, но, возможно, в более тяжелой манере.

Представляя ExecutorChannel вызвал целый ряд изменений в реализации, в том числе: отключение setDeleteFiles флаг на AbtractFilePayloadTransformer, обновление JPA @Entity, RunStats и хранилище для этого, чтобы захватить статус обработки файла, а также статус обработки для всего прогона. Взятые вместе обновления статуса обработки позволяют потоку знать, когда удалять файлы из локальной файловой системы (т.е. когда они полностью обработаны) и возвращать статус в /stats/{run} конечная точка, чтобы пользователь мог знать, когда запуск завершен.

Вот отрывки из моей реализации (если кому-то любопытно)...

class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
    return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}

public class RunStatsHandler extends AbstractMessageHandler {

private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

private final RunStatsRepository runStatsRepository;

public RunStatsHandler(RunStatsRepository runStatsRepository) {
    this.runStatsRepository = runStatsRepository;
}

// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
    RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
    String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
    if (runStats != null) {
        File compressedFile = (File) message.getPayload();
        String compressedFileName = compressedFile.getCanonicalPath();
        LongAdder lineCount = new LongAdder();
        // Streams and Scanner implement java.lang.AutoCloseable
        InputStream fs = new FileInputStream(compressedFile);
        InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
        try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
            while (sc.hasNextLine()) {
                sc.nextLine();
                lineCount.increment();
            }
            // note that Scanner suppresses exceptions
            if (sc.ioException() != null) {
                log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, 
                        "exception", sc.ioException().getMessage()));
                throw sc.ioException();
            }
            runStats.addFile(compressedFileName, token, lineCount.longValue());
            runStatsRepository.updateRunStats(runStats);
            log.info("file.lineCount",
                    ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
        }
    }
}

}

Обновленный поток

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
        .channel(runStatsChannel())
        .channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
        .transform(new FileToInputStreamTransformer())
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

@Bean
public MessageChannel runStatsChannel() {
    DirectChannel wiretapChannel = new DirectChannel();
    wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
    DirectChannel loggingChannel = new DirectChannel();
    loggingChannel.addInterceptor(new WireTap(wiretapChannel));
    return loggingChannel;
}

К сожалению, я не могу поделиться RunStats и репо реализации.

FileSplitter имеет markers Вариант именно для этой цели:

Установите в значение true, чтобы отправлять сообщения начала / конца файла маркера до и после данных файла. Маркеры - это сообщения с FileSplitter.FileMarker полезные нагрузки (с START а также END значения в свойстве mark). Маркеры могут использоваться при последовательной обработке файлов в нисходящем потоке, где некоторые строки фильтруются. Они позволяют последующей обработке знать, когда файл был полностью обработан. END Маркер включает в себя количество строк. По умолчанию: false, когда true, apply-sequence является false по умолчанию.

Вы можете использовать его в нисходящем потоке, чтобы определить, можете ли вы удалить файл уже или нет.

Другие вопросы по тегам