Как правильно обрабатывать обратное давление в потоке преобразования файла node.js?

вступление

Это мои первые приключения в написании серверной части node.js. Пока это было весело, но у меня возникли некоторые трудности с пониманием правильного способа реализации чего-либо, связанного с потоками node.js.

проблема

В целях тестирования и обучения я работаю с большими файлами, содержимое которых сжато zlib. Сжатый контент представляет собой двоичные данные, каждыйпакет имеет длину 38 байт. Я пытаюсь создать результирующий файл, который выглядит практически идентично исходному файлу, за исключением того, что для каждых 1024 пакетов по38 байтов есть несжатый 31-байтовый заголовок.

исходное содержимое файла (распаковано)

+----------+----------+----------+----------+
| packet 1 | packet 2 |  ......  | packet N |
| 38 bytes | 38 bytes |  ......  | 38 bytes |
+----------+----------+----------+----------+

итоговое содержимое файла

+----------+--------------------------------+----------+--------------------------------+
| header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
| 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |
+----------+--------------------------------+----------+--------------------------------+

Как видите, это в некоторой степени проблема перевода. Это означает, что я принимаю некоторый исходный поток в качестве входных данных, а затем немного преобразовываю его в некоторый выходной поток. Поэтому было естественным реализовать поток Transform.

Класс просто пытается выполнить следующее:

  1. Принимает поток в качестве входных данных
  2. zlib раздувает порции данных, чтобы подсчитать количество пакетов, собрать 1024 из них, дефлировать zlib и добавить заголовок.
  3. Передача нового полученного фрагмента по конвейеру через this.push(chunk),

Вариант использования будет что-то вроде:

var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);

Вопросы)

Предполагая, что Transform является хорошим выбором для этого варианта использования, я, похоже, столкнулся с возможной проблемой обратного давления. Мой звонок this.push(chunk) в _transform продолжает возвращаться false, С чего бы это и как обращаться с такими вещами?

8 ответов

Этот вопрос 2013 года - это все, что мне удалось найти о том, как бороться с "обратным давлением" при создании потоков Transform узла.

С узла 7.10.0 Transform stream и документация Readable stream я понял, что однажды push возвращено false, больше ничего не нужно нажимать, пока _read назывался.

В документации Transform не упоминается _read за исключением упоминания того, что базовый класс Transform реализует его (и _write). Я нашел информацию о push возвращая ложное и _read вызывается в документации для чтения потоков.

Единственный другой авторитетный комментарий, который я нашел по обратному давлению Transform, только упомянул об этом как о проблеме, и это было в комментарии в верхней части файла узла _stream_transform.js.

Вот раздел о противодействии этого комментария:

// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.

Пример решения

Вот решение, которое я собрал, чтобы справиться с обратным давлением в потоке Transform, которое, я уверен, работает. (Я не написал никаких реальных тестов, которые потребовали бы написания потока для записи для контроля противодавления.)

Это элементарное Преобразование Линии, которое нуждается в работе как преобразование Линии, но демонстрирует обработку "обратного давления".

const stream = require('stream');

class LineTransform extends stream.Transform
{
    constructor(options)
    {
        super(options);

        this._lastLine = "";
        this._continueTransform = null;
        this._transforming = false;
        this._debugTransformCallCount = 0;
    }

    _transform(chunk, encoding, callback)
    {
        if (encoding === "buffer")
            return callback(new Error("Buffer chunks not supported"));

        if (this._continueTransform !== null)
            return callback(new Error("_transform called before previous transform has completed."));

        // DEBUG: Uncomment for debugging help to see what's going on
        //console.error(`${++this._debugTransformCallCount} _transform called:`);

        // Guard (so we don't call _continueTransform from _read while it is being
        // invoked from _transform)
        this._transforming = true;

        // Do our transforming (in this case splitting the big chunk into lines)
        let lines = (this._lastLine + chunk).split(/\r\n|\n/);
        this._lastLine = lines.pop();

        // In order to respond to "back pressure" create a function
        // that will push all of the lines stopping when push returns false,
        // and then resume where it left off when called again, only calling
        // the "callback" once all lines from this transform have been pushed.
        // Resuming (until done) will be done by _read().
        let nextLine = 0;
        this._continueTransform = () =>
            {
                let backpressure = false;
                while (nextLine < lines.length)
                {

                    if (!this.push(lines[nextLine++] + "\n"))
                    {
                        // we've got more to push, but we got backpressure so it has to wait.
                        if (backpressure)
                            return;

                        backpressure = !this.push(lines[nextLine++] + "\n");
                    }
                }

                // DEBUG: Uncomment for debugging help to see what's going on
                //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);

                // All lines are pushed, remove this function from the LineTransform instance
                this._continueTransform = null;
                return callback();
            };

        // Start pushing the lines
        this._continueTransform();

        // Turn off guard allowing _read to continue the transform pushes if needed.
        this._transforming = false;
    }

    _flush(callback)
    {
        if (this._lastLine.length > 0)
        {
            this.push(this._lastLine);
            this._lastLine = "";
        }

        return callback();
    }

    _read(size)
    {
        // DEBUG: Uncomment for debugging help to see what's going on
        //if (this._transforming)
        //    console.error(`_read called during _transform ${this._debugTransformCallCount}`);

        // If a transform has not pushed every line yet, continue that transform
        // otherwise just let the base class implementation do its thing.
        if (!this._transforming && this._continueTransform !== null)
            this._continueTransform();
        else
            super._read(size);
    }
}

Я проверил вышеизложенное, запустив его с некомментированными строками DEBUG в файле ~10000 строк ~200 КБ. Перенаправьте stdout или stderr в файл (или оба), чтобы отделить операторы отладки от ожидаемого вывода. (node test.js > out.log 2> err.log)

const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);

Полезный совет по отладке

Когда я писал это изначально, я не осознавал, что _read можно было вызвать раньше _transform вернулся, поэтому я не реализовал this._transforming Охранник и я получили следующую ошибку:

Error: no writecb in Transform class
    at afterTransform (_stream_transform.js:71:33)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
    at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
    at LineTransform.Transform._read (_stream_transform.js:167:10)
    at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
    at LineTransform.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:331:12)
    at writeOrBuffer (_stream_writable.js:317:5)
    at LineTransform.Writable.write (_stream_writable.js:243:11)

Глядя на реализацию узла, я понял, что эта ошибка означает, что обратный вызов, данный _transform звонили не раз. Об этой ошибке также не было много информации, поэтому я решил включить сюда то, что выяснил.

push вернет false, если поток, в который вы пишете (в данном случае, поток вывода файла), имеет слишком много буферизованных данных. Поскольку вы пишете на диск, это имеет смысл: вы обрабатываете данные быстрее, чем можете их записать.

когда outБуфер заполнен, ваш поток преобразования не сможет нажать и начать буферизацию данных. Если этот буфер должен заполниться, то inpх начнёт заполнять. Вот как все должно работать. Передаваемые по конвейеру потоки обрабатывают данные только так быстро, как это может обработать самое медленное звено в цепочке (когда ваши буферы заполнены).

Я думаю Transform подходит для этого, но я бы выполнил раздувание как отдельный шаг в конвейере.

Вот быстрый и в значительной степени непроверенный пример:

var zlib        = require('zlib');
var stream      = require('stream');
var transformer = new stream.Transform();

// Properties used to keep internal state of transformer.
transformer._buffers    = [];
transformer._inputSize  = 0;
transformer._targetSize = 1024 * 38;

// Dump one 'output packet'
transformer._dump       = function(done) {
  // concatenate buffers and convert to binary string
  var buffer = Buffer.concat(this._buffers).toString('binary');

  // Take first 1024 packets.
  var packetBuffer = buffer.substring(0, this._targetSize);

  // Keep the rest and reset counter.
  this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
  this._inputSize = this._buffers[0].length;

  // output header
  this.push('HELLO WORLD');

  // output compressed packet buffer
  zlib.deflate(packetBuffer, function(err, compressed) {
    // TODO: handle `err`
    this.push(compressed);
    if (done) {
      done();
    }
  }.bind(this));
};

// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform  = function(chunk, encoding, done) {
  this._buffers.push(chunk);
  this._inputSize += chunk.length;

  if (this._inputSize >= this._targetSize) {
    this._dump(done);
  } else {
    done();
  }
};

// Flush any remaining buffers.
transformer._flush = function() {
  this._dump();
};

// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
  .pipe(zlib.createInflate())
  .pipe(transformer)
  .pipe(fs.createWriteStream('depth_1000000.out'));

В последнее время столкнулся с подобной проблемой, которая требовала обработки противодавления в раздувающемся потоке преобразования - секрет обработки push() возвращение ложного - это регистрация и обработка 'drain' событие в потоке

_transform(data, enc, callback) {
  const continueTransforming = () => {
    ... do some work / parse the data, keep state of where we're at etc
    if(!this.push(event)) 
         this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data
    if(allDone)
       callback();
  }
  continueTransforming()
}

ЗАМЕТЬТЕ, что это немного глупо, поскольку мы достигаем внутренних органов и pipes может даже быть массивом Readableс, но это работает в общем случае ....pipe(transform).pipe(...

Было бы здорово, если бы кто-то из сообщества Node мог предложить "правильный" метод для обработки .push() возвращение ложного

В итоге я последовал примеру Ледиона и создал вспомогательный класс Transform, который помогает с противодавлением. Утилита добавляет асинхронный метод с именем addData, который может ожидать реализующий Transform.

'use strict';

const { Transform } = require('stream');

/**
 * The BackPressureTransform class adds a utility method addData which
 * allows for pushing data to the Readable, while honoring back-pressure.
 */
class BackPressureTransform extends Transform {
  constructor(...args) {
    super(...args);
  }

  /**
   * Asynchronously add a chunk of data to the output, honoring back-pressure.
   *
   * @param {String} data
   * The chunk of data to add to the output.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the data has been added.
   */
  async addData(data) {
    // if .push() returns false, it means that the readable buffer is full
    // when this occurs, we must wait for the internal readable to emit
    // the 'drain' event, signalling the readable is ready for more data
    if (!this.push(data)) {
      await new Promise((resolve, reject) => {
        const errorHandler = error => {
          this.emit('error', error);
          reject();
        };
        const boundErrorHandler = errorHandler.bind(this);

        this._readableState.pipes.on('error', boundErrorHandler);
        this._readableState.pipes.once('drain', () => {
          this._readableState.pipes.removeListener('error', boundErrorHandler);
          resolve();
        });
      });
    }
  }
}

module.exports = {
  BackPressureTransform
};

Используя этот служебный класс, мои Transforms теперь выглядят так:

'use strict';

const { BackPressureTransform } = require('./back-pressure-transform');

/**
 * The Formatter class accepts the transformed row to be added to the output file.
 * The class provides generic support for formatting the result file.
 */
class Formatter extends BackPressureTransform {
  constructor() {
    super({
      encoding: 'utf8',
      readableObjectMode: false,
      writableObjectMode: true
    });

    this.anyObjectsWritten = false;
  }

  /**
   * Called when the data pipeline is complete.
   *
   * @param {Function} callback
   * The function which is called when final processing is complete.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the flush completes.
   */
  async _flush(callback) {
    // if any object is added, close the surrounding array
    if (this.anyObjectsWritten) {
      await this.addData('\n]');
    }

    callback(null);
  }

  /**
   * Given the transformed row from the ETL, format it to the desired layout.
   *
   * @param {Object} sourceRow
   * The transformed row from the ETL.
   *
   * @param {String} encoding
   * Ignored in object mode.
   *
   * @param {Function} callback
   * The callback function which is called when the formatting is complete.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the row is transformed.
   */
  async _transform(sourceRow, encoding, callback) {
    // before the first object is added, surround the data as an array
    // between each object, add a comma separator
    await this.addData(this.anyObjectsWritten ? ',\n' : '[\n');

    // update state
    this.anyObjectsWritten = true;

    // add the object to the output
    const parsed = JSON.stringify(sourceRow, null, 2).split('\n');
    for (const [index, row] of parsed.entries()) {
      // prepend the row with 2 additional spaces since we're inside a larger array
      await this.addData(`  ${row}`);

      // add line breaks except for the last row
      if (index < parsed.length - 1) {
        await this.addData('\n');
      }
    }

    callback(null);
  }
}

module.exports = {
  Formatter
};

Я нашел решение, похожее на решение Ледиона, без необходимости углубляться во внутренности текущего потокового конвейера. Вы можете достичь этого с помощью:

      _transform(data, enc, callback) {
  const continueTransforming = () => {
    // ... do some work / parse the data, keep state of where we're at etc
    if(!this.push(event)) 
         this.once('data', continueTransforming); // will get called again when the reader can consume more data
    if(allDone)
       callback();
  }
  continueTransforming()
}

Это работает, потому чтоdataиспускается только тогда, когда кто-то нижестоящий использует доступный для чтения буфер, который выthis.push()-ing к. Поэтому всякий раз, когда у нисходящего потока есть возможность извлечь из этого буфера, вы должны иметь возможность начать обратную запись в буфер.

Недостаток прослушивания в нисходящем направлении (кроме проникновения внутрь узла) заключается в том, что вы также полагаетесь на своиTransformбуфер также был опустошен, что не гарантируется, когда нисходящий поток испускаетdrain.

Думаю, ответ Майка Липперта наиболее близок к истине. Похоже, ждем нового_read() вызов для начала из потока чтения - единственный способ Transformактивно уведомляется о том, что читатель готов. Я хотел поделиться простым примером того, как я переопределяю_read() временно.

_transform(buf, enc, callback) {

  // prepend any unused data from the prior chunk.
  if (this.prev) {
    buf = Buffer.concat([ this.prev, buf ]);
    this.prev = null;
  }

  // will keep transforming until buf runs low on data.
  if (buf.length < this.requiredData) {
    this.prev = buf;
    return callback();
  }

  var result = // do something with data...
  var nextbuf = buf.slice(this.requiredData);

  if (this.push(result)) {
    // Continue transforming this chunk
    this._transform(nextbuf, enc, callback);
  }
  else {
    // Node is warning us to slow down (applying "backpressure")
    // Temporarily override _read request to continue the transform
    this._read = function() {
        delete this._read;
        this._transform(nextbuf, enc, callback);
    };
  }
}

Я пытался найти комментарий, упомянутый в исходном коде для преобразования, и ссылка на ссылку продолжает меняться, поэтому я оставлю это здесь для справки:

      // a transform stream is a readable/writable stream where you do
// something with the data.  Sometimes it's called a "filter",
// but that's not a great name for it, since that implies a thing where
// some bits pass through, and others are simply ignored.  (That would
// be a valid example of a transform, of course.)
//
// While the output is causally related to the input, it's not a
// necessarily symmetric or synchronous transformation.  For example,
// a zlib stream might take multiple plain-text writes(), and then
// emit a single compressed chunk some time in the future.
//
// Here's how this works:
//
// The Transform stream has all the aspects of the readable and writable
// stream classes.  When you write(chunk), that calls _write(chunk,cb)
// internally, and returns false if there's a lot of pending writes
// buffered up.  When you call read(), that calls _read(n) until
// there's enough pending readable data buffered up.
//
// In a transform stream, the written data is placed in a buffer.  When
// _read(n) is called, it transforms the queued up data, calling the
// buffered _write cb's as it consumes chunks.  If consuming a single
// written chunk would result in multiple output chunks, then the first
// outputted bit calls the readcb, and subsequent chunks just go into
// the read buffer, and will cause it to emit 'readable' if necessary.
//
// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.
Другие вопросы по тегам