Чтение ресурса S3 с проблемой ItemReaders в SpringBatch

У меня есть Spring Batch Job, который читает кучу файлов из S3 Bucket, обрабатывает их и затем отправляет в базу данных, делая это в многопоточной конфигурации. application.properties Файл содержит это:

cloud.aws.credentials.accessKey=accessKey 
cloud.aws.credentials.secretKey=secret
cloud.aws.region.static=us-east-1
cloud.aws.credentials.instanceProfile=true 
cloud.aws.stack.auto=false

Мой ItemReader:

@Bean
ItemReader<DataRecord> itemReader() {
    FlatFileItemReader<DataRecord> flatFileItemReader = new FlatFileItemReader<>();
    flatFileItemReader.setLinesToSkip(0);
    flatFileItemReader.setLineMapper(new DataRecord.DataRecordLineMapper());
    flatFileItemReader.setSaveState(false);

    MultiResourceItemReader<DataRecord> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setDelegate(flatFileItemReader);
    multiResourceItemReader.setResources(loadS3Resources(null, null));
    multiResourceItemReader.setSaveState(false);

    SynchronizedItemStreamReader<DataRecord> itemStreamReader = new SynchronizedItemStreamReader<>();
    itemStreamReader.setDelegate(multiResourceItemReader);
    return itemStreamReader;
}

И мой TaskExecutor:

@Bean
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
    return threadPoolTaskExecutor;
}

Задание состоит только из одного шага, когда он читает из файлов, обрабатывает их и затем записывает в БД. При такой конфигурации ресурсы загружаются, задание запускается, и шаг выполняет обработку для ~240 тыс. Первых строк первого ресурса (имеется 7 ресурсов, каждый из которых содержит 1,2 млн строк). Тогда я получаю следующее исключение:

org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [Amazon s3 resource [bucket='my-bucket' and object='output/part-r-00000']]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.read(MultiResourceItemReader.java:108) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:55) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.3.9.RELEASE.jar!/:4.3.9.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_65]
Caused by: javax.net.ssl.SSLException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:596) ~[na:1.8.0_65]
at sun.security.ssl.InputRecord.read(InputRecord.java:532) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) ~[na:1.8.0_65]
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) ~[na:1.8.0_65]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) ~[httpclient-4.5.3.jar!/:4.5.3]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:117) ~[aws-java-sdk-s3-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[na:1.8.0_65]
at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[na:1.8.0_65]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_65]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
... 23 common frames omitted

Я хотел бы знать, есть ли простой способ исправить это. В настоящее время я думаю просто сделать локальную копию файлов и затем прочитать их, но я хотел бы знать, можно ли избежать этого исключения с помощью какой-либо конфигурации.

Спасибо!

1 ответ

Решение

Я предполагаю, что один поток закрывает сессию SFTP, в то время как другой поток все еще обрабатывает.

Лучше использовать MultiResourcePartitioner для создания раздела для ресурса (файла), а затем заставить читателя подобрать каждый файл отдельно как отдельный раздел. При такой конфигурации вам больше не нужен MultiResourceItemReader (вы можете перейти прямо к делегату).

Смотрите пример здесь https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/resources/jobs/partitionFileJob.xml

Также см. Как применить секционированный счетчик для MultiResourceItemReader?

Другие вопросы по тегам