Как распаковать большие файлы, используя Zstd-jni и Byte Buffers

Я пытаюсь распаковать много файлов размером более 40 МБ, поскольку я загружаю их параллельно, используя ByteBuffers и Channels. Я получаю лучшую пропускную способность при использовании каналов, чем при использовании потоков, и нам нужна эта система с очень высокой пропускной способностью, поскольку нам необходимо обрабатывать 40 ТБ файлов каждый день, и эта часть процесса в настоящее время является узким местом. Файлы сжимаются с помощью zstd-jni. У Zstd-jni есть API для распаковки байтовых буферов, но я получаю сообщение об ошибке при их использовании. Как мне распаковать байтовый буфер за раз, используя zstd-jni?

Я нашел эти примеры в их тестах, но если я что-то упустил, примеры, использующие ByteBuffers, по-видимому, предполагают, что весь входной файл помещается в один ByteBuffer: https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala

Ниже приведен мой код для сжатия и распаковки файлов. Код сжатия работает отлично, но код распаковки завершается с ошибкой -70.

public static long compressFile(String inFile, String outFolder, ByteBuffer inBuffer, ByteBuffer compressedBuffer, int compressionLevel) throws IOException {
    File file = new File(inFile);
    File outFile = new File(outFolder, file.getName() + ".zs");
    long numBytes = 0l;

    try (RandomAccessFile inRaFile = new RandomAccessFile(file, "r");
        RandomAccessFile outRaFile = new RandomAccessFile(outFile, "rw");
                FileChannel inChannel = inRaFile.getChannel();
                FileChannel outChannel = outRaFile.getChannel()) {
        inBuffer.clear();
        while(inChannel.read(inBuffer) > 0) {
            inBuffer.flip();
            compressedBuffer.clear();

            long compressedSize = Zstd.compressDirectByteBuffer(compressedBuffer, 0, compressedBuffer.capacity(), inBuffer, 0, inBuffer.limit(), compressionLevel);
            numBytes+=compressedSize;
            compressedBuffer.position((int)compressedSize);
            compressedBuffer.flip();
            outChannel.write(compressedBuffer);
            inBuffer.clear(); 
        }
    }

    return numBytes;
}

public static long decompressFile(String originalFilePath, String inFolder, ByteBuffer inBuffer, ByteBuffer decompressedBuffer) throws IOException {
    File outFile = new File(originalFilePath);
    File inFile = new File(inFolder, outFile.getName() + ".zs");
    outFile = new File(inFolder, outFile.getName());

    long numBytes = 0l;

    try (RandomAccessFile inRaFile = new RandomAccessFile(inFile, "r");
        RandomAccessFile outRaFile = new RandomAccessFile(outFile, "rw");
                FileChannel inChannel = inRaFile.getChannel();
                FileChannel outChannel = outRaFile.getChannel()) {

        inBuffer.clear();

        while(inChannel.read(inBuffer) > 0) {
            inBuffer.flip();
            decompressedBuffer.clear();
            long compressedSize = Zstd.decompressDirectByteBuffer(decompressedBuffer, 0, decompressedBuffer.capacity(), inBuffer, 0, inBuffer.limit());
            System.out.println(Zstd.isError(compressedSize) + " " + compressedSize);
            numBytes+=compressedSize;
            decompressedBuffer.position((int)compressedSize);
            decompressedBuffer.flip();
            outChannel.write(decompressedBuffer);
            inBuffer.clear(); 
        }
    }

    return numBytes;
}

1 ответ

Решение

Да, статические методы, которые вы используете в своем примере, предполагают, что весь сжатый файл помещается в один ByteBuffer. Насколько я понимаю ваши требования, вам нужна распаковка потоковой передачи с помощью ByteBuffers. ZstdDirectBufferDecompressingStream уже обеспечивает это:

https://static.javadoc.io/com.github.luben/zstd-jni/1.3.7-1/com/github/luben/zstd/ZstdDirectBufferDecompressingStream.html

и вот пример, как его использовать (из тестов):

https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala

но вы также должны разделить его на подклассы и переопределить метод "refill".

РЕДАКТИРОВАТЬ: вот новый тест, который я только что добавил, который имеет ту же структуру, что и ваш вопрос - перемещение данных по четырем каналам:

https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala

Другие вопросы по тегам