Использование AWS Java S3 SDK TransferManager для возобновления загрузки из потока SFTP
В настоящее время я запускаю загрузку с SFTP-сервера в S3 с помощью AWS TransferManager в Java S3 SDK. Ниже приведен способ запуска этой загрузки:
(псевдокод...)
@Autowired
TransferManager transferManager;
@Autowired
SftpStreamFactory sftpStreamFactory;
SftpStream sftpStream = sftpStreamFactory.createStream(filePath);
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(sftpStream.getSizeBytes());
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, sftpStream.getStream(), objectMetadata);
putObjectRequest.setGeneralProgressListener(new UploadBeginEndNotificationListener(uploadRequest, statusNotifier));
transferManager.upload(putObjectRequest);
и вот определение для SftpStream
:
@AllArgsConstructor
public class SftpStreamFactory {
@Getter
@AllArgsConstructor
public static class SftpStream {
private final long sizeBytes;
private final InputStream stream;
}
private final SftpRemoteFileTemplate sftpTemplate;
private final SftpProperties sftpProperties;
public SftpStream createStream(Path relativePath) {
return sftpTemplate.<SftpStream, ChannelSftp>executeWithClient(session -> createStream(session, relativePath));
}
SftpStream createStream(ChannelSftp channelSftp, Path relativePath) {
String path = sftpProperties.getRoot().resolve(relativePath).toString();
try {
SftpATTRS fileAttrs = channelSftp.lstat(path);
long size = fileAttrs.getSize();
return new SftpStream(size, channelSftp.get(path));
}
catch (SftpException e) {
throw new UncheckedIOException(new NestedIOException("SFTP Error", e));
}
}
}
Этот метод загрузки работает нормально. Однако, если многокомпонентная загрузка приостановлена / отменена / в противном случае прервана в середине, мы хотели бы продолжить с того места, на котором остановились, а не перезапускать заново. Мы знаем о TransferManagerresumeUpload
метод, который требует PersistableUpload
.
Однако в javadoc для PersistableUpload
, ожидается file
путь, который нужно передать в конструктор, а затем пытается создать File
объект из него:https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/PersistableUpload.html
Нам интересно, есть ли способ возобновить загрузку без этого файлового объекта, который мы не можем получить из нашего ChannelSftp
? То есть можем ли мы возобновить загрузку из потока вместо файла? Или нам придется переключиться на использование API низкого уровня s3 для выполнения такого резюме. Любые предложения приветствуются.
Изменить - посмотрел немного больше и даже передав UploadId для уже существующей загрузки, метод doUpload выдаст исключение, если файла нет. Есть идеи?
1 ответ
Ответ: нет, вы не можете возобновить загрузку без файла, но есть обходной путь для вашего аналогичного случая:
# УДАЛЕНИЕ ПРИОСТАНОВКИ ПЕРЕД АБОРТОМ
- Если и только если до того, как произошло какое-либо прерывание, попробуйте приостановить соединение
boolean forceCancel = true;
PauseResult<PersistableUpload> pauseResult = myUpload.tryPause(forceCancel);
- Сохранить
info to resume
данные в файл
PersistableUpload persistableUpload = pauseResult.getInfoToResume();
File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE"); //blob
if (!f.exists())
f.createNewFile();
FileOutputStream fos = new FileOutputStream(f);
// Serialize the persistable upload to the file.
persistableUpload.serialize(fos);
fos.close();
- Продолжить иногда позже
TransferManager tm = new TransferManager();
FileInputStream fis = new FileInputStream(new File("UNIQUE-ID-FOR-UPLOADED-FILE"));
// Deserialize PersistableUpload information from disk.
PersistableUpload persistableUpload = PersistableTransfer.deserializeFrom(fis);
// Call resumeUpload with PersistableUpload.
tm.resumeUpload(persistableUpload);
fis.close();
#SAVE STREAM ДЛЯ ПОДДЕРЖКИ ФАЙЛА, ЕСЛИ ЧТО-ТО ПРОИЗОШЛО (например, сбой JVM)
Использовать S3SyncProgressListener
к TransferManager#upload
сохранять каждое изменение и сериализовать данные на диск.
transferManager.upload(putObjectRequest, new S3SyncProgressListener() {
ExecutorService executor = Executors.newFixedThreadPool(1);
@Override
public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE");
if (!f.exists()) {
f.createNewFile();
}
FileOutputStream fos = new FileOutputStream(f);
persistableTransfer.serialize(fos);
fos.close();
} catch (IOException e) {
throw new RuntimeException("Unable to persist transfer to disk.", e);
}
}
});
}
});
Надеюсь, это поможет.