Как правильно обрабатывать обратное давление в потоке преобразования файла node.js?
вступление
Это мои первые приключения в написании серверной части node.js. Пока это было весело, но у меня возникли некоторые трудности с пониманием правильного способа реализации чего-либо, связанного с потоками node.js.
проблема
В целях тестирования и обучения я работаю с большими файлами, содержимое которых сжато zlib. Сжатый контент представляет собой двоичные данные, каждыйпакет имеет длину 38 байт. Я пытаюсь создать результирующий файл, который выглядит практически идентично исходному файлу, за исключением того, что для каждых 1024 пакетов по38 байтов есть несжатый 31-байтовый заголовок.
исходное содержимое файла (распаковано)
+----------+----------+----------+----------+
| packet 1 | packet 2 | ...... | packet N |
| 38 bytes | 38 bytes | ...... | 38 bytes |
+----------+----------+----------+----------+
итоговое содержимое файла
+----------+--------------------------------+----------+--------------------------------+
| header 1 | 1024 38 byte packets | header 2 | 1024 38 byte packets |
| 31 bytes | zlib compressed | 31 bytes | zlib compressed |
+----------+--------------------------------+----------+--------------------------------+
Как видите, это в некоторой степени проблема перевода. Это означает, что я принимаю некоторый исходный поток в качестве входных данных, а затем немного преобразовываю его в некоторый выходной поток. Поэтому было естественным реализовать поток Transform.
Класс просто пытается выполнить следующее:
- Принимает поток в качестве входных данных
- zlib раздувает порции данных, чтобы подсчитать количество пакетов, собрать 1024 из них, дефлировать zlib и добавить заголовок.
- Передача нового полученного фрагмента по конвейеру через
this.push(chunk)
,
Вариант использования будет что-то вроде:
var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);
Вопросы)
Предполагая, что Transform является хорошим выбором для этого варианта использования, я, похоже, столкнулся с возможной проблемой обратного давления. Мой звонок this.push(chunk)
в _transform
продолжает возвращаться false
, С чего бы это и как обращаться с такими вещами?
8 ответов
Этот вопрос 2013 года - это все, что мне удалось найти о том, как бороться с "обратным давлением" при создании потоков Transform узла.
С узла 7.10.0 Transform stream и документация Readable stream я понял, что однажды push
возвращено false, больше ничего не нужно нажимать, пока _read
назывался.
В документации Transform не упоминается _read
за исключением упоминания того, что базовый класс Transform реализует его (и _write). Я нашел информацию о push
возвращая ложное и _read
вызывается в документации для чтения потоков.
Единственный другой авторитетный комментарий, который я нашел по обратному давлению Transform, только упомянул об этом как о проблеме, и это было в комментарии в верхней части файла узла _stream_transform.js.
Вот раздел о противодействии этого комментария:
// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk. However,
// a pathological inflate type of transform can cause excessive buffering
// here. For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output. In this case, you could write a very small
// amount of input, and end up with a very large amount of output. In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform. A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.
Пример решения
Вот решение, которое я собрал, чтобы справиться с обратным давлением в потоке Transform, которое, я уверен, работает. (Я не написал никаких реальных тестов, которые потребовали бы написания потока для записи для контроля противодавления.)
Это элементарное Преобразование Линии, которое нуждается в работе как преобразование Линии, но демонстрирует обработку "обратного давления".
const stream = require('stream');
class LineTransform extends stream.Transform
{
constructor(options)
{
super(options);
this._lastLine = "";
this._continueTransform = null;
this._transforming = false;
this._debugTransformCallCount = 0;
}
_transform(chunk, encoding, callback)
{
if (encoding === "buffer")
return callback(new Error("Buffer chunks not supported"));
if (this._continueTransform !== null)
return callback(new Error("_transform called before previous transform has completed."));
// DEBUG: Uncomment for debugging help to see what's going on
//console.error(`${++this._debugTransformCallCount} _transform called:`);
// Guard (so we don't call _continueTransform from _read while it is being
// invoked from _transform)
this._transforming = true;
// Do our transforming (in this case splitting the big chunk into lines)
let lines = (this._lastLine + chunk).split(/\r\n|\n/);
this._lastLine = lines.pop();
// In order to respond to "back pressure" create a function
// that will push all of the lines stopping when push returns false,
// and then resume where it left off when called again, only calling
// the "callback" once all lines from this transform have been pushed.
// Resuming (until done) will be done by _read().
let nextLine = 0;
this._continueTransform = () =>
{
let backpressure = false;
while (nextLine < lines.length)
{
if (!this.push(lines[nextLine++] + "\n"))
{
// we've got more to push, but we got backpressure so it has to wait.
if (backpressure)
return;
backpressure = !this.push(lines[nextLine++] + "\n");
}
}
// DEBUG: Uncomment for debugging help to see what's going on
//console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);
// All lines are pushed, remove this function from the LineTransform instance
this._continueTransform = null;
return callback();
};
// Start pushing the lines
this._continueTransform();
// Turn off guard allowing _read to continue the transform pushes if needed.
this._transforming = false;
}
_flush(callback)
{
if (this._lastLine.length > 0)
{
this.push(this._lastLine);
this._lastLine = "";
}
return callback();
}
_read(size)
{
// DEBUG: Uncomment for debugging help to see what's going on
//if (this._transforming)
// console.error(`_read called during _transform ${this._debugTransformCallCount}`);
// If a transform has not pushed every line yet, continue that transform
// otherwise just let the base class implementation do its thing.
if (!this._transforming && this._continueTransform !== null)
this._continueTransform();
else
super._read(size);
}
}
Я проверил вышеизложенное, запустив его с некомментированными строками DEBUG в файле ~10000 строк ~200 КБ. Перенаправьте stdout или stderr в файл (или оба), чтобы отделить операторы отладки от ожидаемого вывода. (node test.js > out.log 2> err.log
)
const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);
Полезный совет по отладке
Когда я писал это изначально, я не осознавал, что _read
можно было вызвать раньше _transform
вернулся, поэтому я не реализовал this._transforming
Охранник и я получили следующую ошибку:
Error: no writecb in Transform class
at afterTransform (_stream_transform.js:71:33)
at TransformState.afterTransform (_stream_transform.js:54:12)
at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
at LineTransform.Transform._read (_stream_transform.js:167:10)
at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
at LineTransform.Transform._write (_stream_transform.js:155:12)
at doWrite (_stream_writable.js:331:12)
at writeOrBuffer (_stream_writable.js:317:5)
at LineTransform.Writable.write (_stream_writable.js:243:11)
Глядя на реализацию узла, я понял, что эта ошибка означает, что обратный вызов, данный _transform
звонили не раз. Об этой ошибке также не было много информации, поэтому я решил включить сюда то, что выяснил.
push
вернет false, если поток, в который вы пишете (в данном случае, поток вывода файла), имеет слишком много буферизованных данных. Поскольку вы пишете на диск, это имеет смысл: вы обрабатываете данные быстрее, чем можете их записать.
когда out
Буфер заполнен, ваш поток преобразования не сможет нажать и начать буферизацию данных. Если этот буфер должен заполниться, то inp
х начнёт заполнять. Вот как все должно работать. Передаваемые по конвейеру потоки обрабатывают данные только так быстро, как это может обработать самое медленное звено в цепочке (когда ваши буферы заполнены).
Я думаю Transform
подходит для этого, но я бы выполнил раздувание как отдельный шаг в конвейере.
Вот быстрый и в значительной степени непроверенный пример:
var zlib = require('zlib');
var stream = require('stream');
var transformer = new stream.Transform();
// Properties used to keep internal state of transformer.
transformer._buffers = [];
transformer._inputSize = 0;
transformer._targetSize = 1024 * 38;
// Dump one 'output packet'
transformer._dump = function(done) {
// concatenate buffers and convert to binary string
var buffer = Buffer.concat(this._buffers).toString('binary');
// Take first 1024 packets.
var packetBuffer = buffer.substring(0, this._targetSize);
// Keep the rest and reset counter.
this._buffers = [ new Buffer(buffer.substring(this._targetSize)) ];
this._inputSize = this._buffers[0].length;
// output header
this.push('HELLO WORLD');
// output compressed packet buffer
zlib.deflate(packetBuffer, function(err, compressed) {
// TODO: handle `err`
this.push(compressed);
if (done) {
done();
}
}.bind(this));
};
// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform = function(chunk, encoding, done) {
this._buffers.push(chunk);
this._inputSize += chunk.length;
if (this._inputSize >= this._targetSize) {
this._dump(done);
} else {
done();
}
};
// Flush any remaining buffers.
transformer._flush = function() {
this._dump();
};
// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
.pipe(zlib.createInflate())
.pipe(transformer)
.pipe(fs.createWriteStream('depth_1000000.out'));
В последнее время столкнулся с подобной проблемой, которая требовала обработки противодавления в раздувающемся потоке преобразования - секрет обработки push()
возвращение ложного - это регистрация и обработка 'drain'
событие в потоке
_transform(data, enc, callback) {
const continueTransforming = () => {
... do some work / parse the data, keep state of where we're at etc
if(!this.push(event))
this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data
if(allDone)
callback();
}
continueTransforming()
}
ЗАМЕТЬТЕ, что это немного глупо, поскольку мы достигаем внутренних органов и pipes
может даже быть массивом Readable
с, но это работает в общем случае ....pipe(transform).pipe(...
Было бы здорово, если бы кто-то из сообщества Node мог предложить "правильный" метод для обработки .push()
возвращение ложного
В итоге я последовал примеру Ледиона и создал вспомогательный класс Transform, который помогает с противодавлением. Утилита добавляет асинхронный метод с именем addData, который может ожидать реализующий Transform.
'use strict';
const { Transform } = require('stream');
/**
* The BackPressureTransform class adds a utility method addData which
* allows for pushing data to the Readable, while honoring back-pressure.
*/
class BackPressureTransform extends Transform {
constructor(...args) {
super(...args);
}
/**
* Asynchronously add a chunk of data to the output, honoring back-pressure.
*
* @param {String} data
* The chunk of data to add to the output.
*
* @returns {Promise<void>}
* A Promise resolving after the data has been added.
*/
async addData(data) {
// if .push() returns false, it means that the readable buffer is full
// when this occurs, we must wait for the internal readable to emit
// the 'drain' event, signalling the readable is ready for more data
if (!this.push(data)) {
await new Promise((resolve, reject) => {
const errorHandler = error => {
this.emit('error', error);
reject();
};
const boundErrorHandler = errorHandler.bind(this);
this._readableState.pipes.on('error', boundErrorHandler);
this._readableState.pipes.once('drain', () => {
this._readableState.pipes.removeListener('error', boundErrorHandler);
resolve();
});
});
}
}
}
module.exports = {
BackPressureTransform
};
Используя этот служебный класс, мои Transforms теперь выглядят так:
'use strict';
const { BackPressureTransform } = require('./back-pressure-transform');
/**
* The Formatter class accepts the transformed row to be added to the output file.
* The class provides generic support for formatting the result file.
*/
class Formatter extends BackPressureTransform {
constructor() {
super({
encoding: 'utf8',
readableObjectMode: false,
writableObjectMode: true
});
this.anyObjectsWritten = false;
}
/**
* Called when the data pipeline is complete.
*
* @param {Function} callback
* The function which is called when final processing is complete.
*
* @returns {Promise<void>}
* A Promise resolving after the flush completes.
*/
async _flush(callback) {
// if any object is added, close the surrounding array
if (this.anyObjectsWritten) {
await this.addData('\n]');
}
callback(null);
}
/**
* Given the transformed row from the ETL, format it to the desired layout.
*
* @param {Object} sourceRow
* The transformed row from the ETL.
*
* @param {String} encoding
* Ignored in object mode.
*
* @param {Function} callback
* The callback function which is called when the formatting is complete.
*
* @returns {Promise<void>}
* A Promise resolving after the row is transformed.
*/
async _transform(sourceRow, encoding, callback) {
// before the first object is added, surround the data as an array
// between each object, add a comma separator
await this.addData(this.anyObjectsWritten ? ',\n' : '[\n');
// update state
this.anyObjectsWritten = true;
// add the object to the output
const parsed = JSON.stringify(sourceRow, null, 2).split('\n');
for (const [index, row] of parsed.entries()) {
// prepend the row with 2 additional spaces since we're inside a larger array
await this.addData(` ${row}`);
// add line breaks except for the last row
if (index < parsed.length - 1) {
await this.addData('\n');
}
}
callback(null);
}
}
module.exports = {
Formatter
};
Я нашел решение, похожее на решение Ледиона, без необходимости углубляться во внутренности текущего потокового конвейера. Вы можете достичь этого с помощью:
_transform(data, enc, callback) {
const continueTransforming = () => {
// ... do some work / parse the data, keep state of where we're at etc
if(!this.push(event))
this.once('data', continueTransforming); // will get called again when the reader can consume more data
if(allDone)
callback();
}
continueTransforming()
}
Это работает, потому чтоdata
испускается только тогда, когда кто-то нижестоящий использует доступный для чтения буфер, который выthis.push()
-ing к. Поэтому всякий раз, когда у нисходящего потока есть возможность извлечь из этого буфера, вы должны иметь возможность начать обратную запись в буфер.
Недостаток прослушивания в нисходящем направлении (кроме проникновения внутрь узла) заключается в том, что вы также полагаетесь на своиTransform
буфер также был опустошен, что не гарантируется, когда нисходящий поток испускаетdrain
.
Думаю, ответ Майка Липперта наиболее близок к истине. Похоже, ждем нового_read()
вызов для начала из потока чтения - единственный способ Transform
активно уведомляется о том, что читатель готов. Я хотел поделиться простым примером того, как я переопределяю_read()
временно.
_transform(buf, enc, callback) {
// prepend any unused data from the prior chunk.
if (this.prev) {
buf = Buffer.concat([ this.prev, buf ]);
this.prev = null;
}
// will keep transforming until buf runs low on data.
if (buf.length < this.requiredData) {
this.prev = buf;
return callback();
}
var result = // do something with data...
var nextbuf = buf.slice(this.requiredData);
if (this.push(result)) {
// Continue transforming this chunk
this._transform(nextbuf, enc, callback);
}
else {
// Node is warning us to slow down (applying "backpressure")
// Temporarily override _read request to continue the transform
this._read = function() {
delete this._read;
this._transform(nextbuf, enc, callback);
};
}
}
Я пытался найти комментарий, упомянутый в исходном коде для преобразования, и ссылка на ссылку продолжает меняться, поэтому я оставлю это здесь для справки:
// a transform stream is a readable/writable stream where you do
// something with the data. Sometimes it's called a "filter",
// but that's not a great name for it, since that implies a thing where
// some bits pass through, and others are simply ignored. (That would
// be a valid example of a transform, of course.)
//
// While the output is causally related to the input, it's not a
// necessarily symmetric or synchronous transformation. For example,
// a zlib stream might take multiple plain-text writes(), and then
// emit a single compressed chunk some time in the future.
//
// Here's how this works:
//
// The Transform stream has all the aspects of the readable and writable
// stream classes. When you write(chunk), that calls _write(chunk,cb)
// internally, and returns false if there's a lot of pending writes
// buffered up. When you call read(), that calls _read(n) until
// there's enough pending readable data buffered up.
//
// In a transform stream, the written data is placed in a buffer. When
// _read(n) is called, it transforms the queued up data, calling the
// buffered _write cb's as it consumes chunks. If consuming a single
// written chunk would result in multiple output chunks, then the first
// outputted bit calls the readcb, and subsequent chunks just go into
// the read buffer, and will cause it to emit 'readable' if necessary.
//
// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk. However,
// a pathological inflate type of transform can cause excessive buffering
// here. For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output. In this case, you could write a very small
// amount of input, and end up with a very large amount of output. In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform. A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.