Spring Integration Java DSL flow Splitter/Aggregator удаляет файл после обработки всех строк
Используя Spring Integration Java DSL, я создал поток, в котором я обрабатываю файлы синхронно с FileSplitter
, Я был в состоянии использовать setDeleteFiles
флаг на AbstractFilePayloadTransformer
удалить файл после преобразования каждой строки в File
к Message
для последующей обработки, вот так:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// do not exhaust filesystem w/ files downloaded from S3
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.channel(StatsUtil.createRunStatsChannel(runStatsRepository))
.transform(transformer)
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
Это работает хорошо, но медленно. Поэтому я пытаюсь добавить ExecutorChannel
после .split
выше, вот так:
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
Но тогда вышеупомянутый флаг удаления не позволяет потоку успешно завершить удаление файла (ов) до того, как они будут полностью прочитаны.
Если я уберу флаг, у меня будет возможность исчерпать локальную файловую систему, где файлы были синхронизированы с S3.
Что я мог бы представить выше, чтобы а) полностью обработать каждый файл и б) удалить файл из локальной файловой системы после завершения? Другими словами, есть ли способ узнать точно, когда файл полностью обрабатывается (когда его строки обрабатываются асинхронно через потоки в пуле)?
Если вам интересно, вот мой вывод FileToInputStreamTransformer
:
public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
ОБНОВИТЬ
Итак, как что-то в нисходящем потоке знает, что просить?
Кстати, если я правильно следую вашему совету, когда я обновляю .split
с new FileSplitter(true, true)
выше я получаю
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: не удалось преобразовать сообщение; вложенное исключение - java.lang.IllegalArgumentException: аргумент 'json' должен быть экземпляром: [class java.lang.String, class [B, класс java.io.File, класс java.net.URL, класс java.io.InputStream, класс java.io.Reader], но получил: класс org.springframework.integration.file.splitter.FileSplitter$FileMarker в org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
2 ответа
Спасибо Артем.
Мне удалось решить проблему, но, возможно, в более тяжелой манере.
Представляя ExecutorChannel
вызвал целый ряд изменений в реализации, в том числе: отключение setDeleteFiles
флаг на AbtractFilePayloadTransformer
, обновление JPA @Entity
, RunStats
и хранилище для этого, чтобы захватить статус обработки файла, а также статус обработки для всего прогона. Взятые вместе обновления статуса обработки позволяют потоку знать, когда удалять файлы из локальной файловой системы (т.е. когда они полностью обработаны) и возвращать статус в /stats/{run}
конечная точка, чтобы пользователь мог знать, когда запуск завершен.
Вот отрывки из моей реализации (если кому-то любопытно)...
class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
public class RunStatsHandler extends AbstractMessageHandler {
private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
private final RunStatsRepository runStatsRepository;
public RunStatsHandler(RunStatsRepository runStatsRepository) {
this.runStatsRepository = runStatsRepository;
}
// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
if (runStats != null) {
File compressedFile = (File) message.getPayload();
String compressedFileName = compressedFile.getCanonicalPath();
LongAdder lineCount = new LongAdder();
// Streams and Scanner implement java.lang.AutoCloseable
InputStream fs = new FileInputStream(compressedFile);
InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
while (sc.hasNextLine()) {
sc.nextLine();
lineCount.increment();
}
// note that Scanner suppresses exceptions
if (sc.ioException() != null) {
log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName,
"exception", sc.ioException().getMessage()));
throw sc.ioException();
}
runStats.addFile(compressedFileName, token, lineCount.longValue());
runStatsRepository.updateRunStats(runStats);
log.info("file.lineCount",
ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
}
}
}
}
Обновленный поток
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
public MessageChannel runStatsChannel() {
DirectChannel wiretapChannel = new DirectChannel();
wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
DirectChannel loggingChannel = new DirectChannel();
loggingChannel.addInterceptor(new WireTap(wiretapChannel));
return loggingChannel;
}
К сожалению, я не могу поделиться RunStats
и репо реализации.
FileSplitter
имеет markers
Вариант именно для этой цели:
Установите в значение true, чтобы отправлять сообщения начала / конца файла маркера до и после данных файла. Маркеры - это сообщения с
FileSplitter.FileMarker
полезные нагрузки (сSTART
а такжеEND
значения в свойстве mark). Маркеры могут использоваться при последовательной обработке файлов в нисходящем потоке, где некоторые строки фильтруются. Они позволяют последующей обработке знать, когда файл был полностью обработан.END
Маркер включает в себя количество строк. По умолчанию:false
, когдаtrue
,apply-sequence
являетсяfalse
по умолчанию.
Вы можете использовать его в нисходящем потоке, чтобы определить, можете ли вы удалить файл уже или нет.