Использование AWS Java S3 SDK TransferManager для возобновления загрузки из потока SFTP

В настоящее время я запускаю загрузку с SFTP-сервера в S3 с помощью AWS TransferManager в Java S3 SDK. Ниже приведен способ запуска этой загрузки:

(псевдокод...)

    @Autowired
    TransferManager transferManager;

    @Autowired
    SftpStreamFactory sftpStreamFactory;

    SftpStream sftpStream = sftpStreamFactory.createStream(filePath);
    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.setContentLength(sftpStream.getSizeBytes());
    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, sftpStream.getStream(), objectMetadata);
    putObjectRequest.setGeneralProgressListener(new UploadBeginEndNotificationListener(uploadRequest, statusNotifier));
    
    transferManager.upload(putObjectRequest);

и вот определение для SftpStream:

@AllArgsConstructor
public class SftpStreamFactory {

@Getter
@AllArgsConstructor
public static class SftpStream {
    private final long sizeBytes;
    private final InputStream stream;
}

private final SftpRemoteFileTemplate sftpTemplate;
private final SftpProperties sftpProperties;

public SftpStream createStream(Path relativePath) {
    return sftpTemplate.<SftpStream, ChannelSftp>executeWithClient(session -> createStream(session, relativePath));
}

SftpStream createStream(ChannelSftp channelSftp, Path relativePath) {

    String path = sftpProperties.getRoot().resolve(relativePath).toString();

    try {
        SftpATTRS fileAttrs = channelSftp.lstat(path);
        long size = fileAttrs.getSize();
        return new SftpStream(size, channelSftp.get(path));
    }
    catch (SftpException e) {
        throw new UncheckedIOException(new NestedIOException("SFTP Error", e));
    }
}

}

Этот метод загрузки работает нормально. Однако, если многокомпонентная загрузка приостановлена ​​/ отменена / в противном случае прервана в середине, мы хотели бы продолжить с того места, на котором остановились, а не перезапускать заново. Мы знаем о TransferManagerresumeUpload метод, который требует PersistableUpload.

Однако в javadoc для PersistableUpload, ожидается file путь, который нужно передать в конструктор, а затем пытается создать Fileобъект из него:https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/PersistableUpload.html

Нам интересно, есть ли способ возобновить загрузку без этого файлового объекта, который мы не можем получить из нашего ChannelSftp? То есть можем ли мы возобновить загрузку из потока вместо файла? Или нам придется переключиться на использование API низкого уровня s3 для выполнения такого резюме. Любые предложения приветствуются.

Изменить - посмотрел немного больше и даже передав UploadId для уже существующей загрузки, метод doUpload выдаст исключение, если файла нет. Есть идеи?

1 ответ

Ответ: нет, вы не можете возобновить загрузку без файла, но есть обходной путь для вашего аналогичного случая:

# УДАЛЕНИЕ ПРИОСТАНОВКИ ПЕРЕД АБОРТОМ

  1. Если и только если до того, как произошло какое-либо прерывание, попробуйте приостановить соединение
boolean forceCancel = true;
PauseResult<PersistableUpload> pauseResult = myUpload.tryPause(forceCancel);
  1. Сохранить info to resume данные в файл
PersistableUpload persistableUpload = pauseResult.getInfoToResume();

File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE"); //blob
if (!f.exists())
    f.createNewFile();
FileOutputStream fos = new FileOutputStream(f);

// Serialize the persistable upload to the file.
persistableUpload.serialize(fos);
fos.close();
  1. Продолжить иногда позже
TransferManager tm = new TransferManager();
FileInputStream fis = new FileInputStream(new File("UNIQUE-ID-FOR-UPLOADED-FILE"));

// Deserialize PersistableUpload information from disk.
PersistableUpload persistableUpload = PersistableTransfer.deserializeFrom(fis);

// Call resumeUpload with PersistableUpload.
tm.resumeUpload(persistableUpload);

fis.close();

#SAVE STREAM ДЛЯ ПОДДЕРЖКИ ФАЙЛА, ЕСЛИ ЧТО-ТО ПРОИЗОШЛО (например, сбой JVM)

Использовать S3SyncProgressListener к TransferManager#upload сохранять каждое изменение и сериализовать данные на диск.

transferManager.upload(putObjectRequest, new S3SyncProgressListener() {

    ExecutorService executor = Executors.newFixedThreadPool(1);

    @Override
    public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {

       executor.submit(new Runnable() {
          @Override
          public void run() {
              try {
                  File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE");
                  if (!f.exists()) {
                      f.createNewFile();
                  }
                  FileOutputStream fos = new FileOutputStream(f);
                  persistableTransfer.serialize(fos);
                  fos.close();
              } catch (IOException e) {
                  throw new RuntimeException("Unable to persist transfer to disk.", e);
              }
          }
       });
    }
});

Надеюсь, это поможет.

Другие вопросы по тегам