Как передать несколько ReadableStreams в один WriteStream?

Я имею дело с лимитом брандмауэра, где я могу только POST 10 МБ одновременно. Для обработки больших загрузок я хотел бы использовать что-то вроде http://www.resumablejs.com/, записать несколько фрагментов на диск и в конце рекомбинировать их.

Я сейчас пишу тесты, но что-то в моей реализации не так.

Сначала я разбил файл следующим образом:

    const splitFile = async () => {
      const chunkSize = 1024 * 1024;
      const photo = fs.createReadStream(path.resolve(FIXTURES, 'hello-tron.jpg'));

      // Write to 2 files
      photo.on('readable', function() {
        const file1 = path.resolve(TEMP, 'chunk.jpg.1');
        const file2 = path.resolve(TEMP, 'chunk.jpg.2');
        let data;
        while (data = this.read(chunkSize)) {
          if (!fs.existsSync(file1)) {
            const output1 = fs.createWriteStream(file1);
            output1.write(data);
            output1.close();
            return;
          }
          const output2 = fs.createWriteStream(file2);
          output2.write(data);
          if (data === null) {
            output2.close();
          }
        }
      });

      return new Promise(resolve => {
        photo.on('end', resolve);
      });
    };

Затем я собираю это так:

const recombine = async () => {
  const output = fs.createWriteStream(path.resolve(TEMP, 'recombined.jpg'));
  const file1 = path.resolve(TEMP, 'chunk.jpg.1');
  const file2 = path.resolve(TEMP, 'chunk.jpg.2');
  return new Promise(resolve => {
    const stream1 = fs.createReadStream(file1);
    const stream2 = fs.createReadStream(file2);

    const recombinator = new Recombinator({
      readables: [stream1, stream2]
    });

    stream1.on('readable', () => {
      stream2.on('readable', () => {
        recombinator.pipe(output);
      });
    });

    stream1.on('end', () => {
      stream2.on('end', () => {
        resolve();
      });
    });
  })
};

Это Recombinator учебный класс:

/* Takes multiple readable streams and returns a single
 * readable stream that can be piped to a writable stream
 */
const {Readable} = require('stream');

class Recombinator extends Readable {
  constructor(opts) {
    super({...opts, readables: undefined});
    const self = this;
    self.readables = opts.readables || [];
  }

  _read(size) {
    this.push(this._getChunk(size));
  }

  _getChunk(size) {
    const reader = this.readables.find(r => !r.closed);
    if (!reader) {
      return null;
    }
    const data = reader.read(size);
    if (!data) {
      reader.closed = true;
      return this._getChunk(size);
    }
    return data;
  }
}

module.exports = Recombinator;

Вот оригинальное изображение:

Вот пересобранное изображение:

1 ответ

Решение

Частично проблема заключалась в том, что readable событие вызывается только один раз, но оно запускается каждый раз, когда есть данные для чтения. Вложение обработчиков событий, вероятно, тоже было не лучшим.

Решение, которое я использую, состоит в том, чтобы изменить Recombinator конструктор вроде так:

constructor(opts) {
    super({...opts, readables: undefined});
    const self = this;
    self.readables = opts.readables || [];

    self._readableCount = 0;
    self._endedCount = 0;

    // Attach listeners to know when all readables are open and closed
    self.readables.forEach(r => {
      r.on('readable', () => {
        if (r._markedReadable) {
          return;
        }
        r._markedReadable = true;
        self._readableCount++;
      });
      r.on('end', () => {
        if (r._markedEnded) {
          return;
        }
        r._markedEnded = true;
        self._endedCount++;
      });
    })
  }

и добавление асинхронных методов, чтобы дождаться открытия всех читателей, вот так:

  async ready(retry = 10) {
    if (this._readableCount === this.readables.length) {
      return Promise.resolve();
    }
    if (retry === 0) {
      return Promise.reject(`Timeout waiting for ${this.readables.length} readables to open - got ${this._readableCount}`);
    }
    await delay(500);
    return this.ready(retry - 1);
  }

  async done(retry = 10) {
    if (this._endedCount === this.readables.length) {
      return Promise.resolve();
    }
    if (retry === 0) {
      return Promise.reject(`Timeout waiting for ${this.readables.length} readables to end - got ${this._endedCount}`);
    }
    await delay(500);
    return this.done(retry - 1);
  }
Другие вопросы по тегам