Как правильно справиться с обратным давлением во время `Transform#flush`
Как правильно справляться с обратным давлением в рамках реализации Transform _flush
метод? Другими словами, если .push()
при промывке возвращает false, существуют ли какие-либо механизмы для правильной обработки обратного давления из нисходящего потока?
Документация предписывает прекратить толкание, как только .push()
возвращает false, но тогда у Transform нет средств для прослушивания, когда нижестоящий поток хочет возобновить чтение, кроме как для переопределения this.read
; но как бы это выглядело и есть ли опасность для этого?
Вот рабочий пример, с которым вы можете играть.
const stream = require('stream');
// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);
class example extends stream.Transform {
constructor() {
super({
writableObjectMode: true,
});
// some internal queue that will be emptied once writable side ends
Object.assign(this, {
internal_queue: [],
});
}
_transform(g_chunk, s_encoding, fk_transform) {
// store chunk in internal queue
this.internal_queue.push(g_chunk);
// done with transform (no writes)
fk_transform();
}
_flush(fk_flush) {
console.warn('starting to flush');
// now that writable side has ended, flush internal queue
this.resumeFlush(fk_flush);
}
resumeFlush(fk_flush) {
let a_queue = this.internal_queue;
// still data left in internal queue
while(a_queue.length) {
// remove an item from queue
a_queue.pop();
// intentionally overflow buffer
if(!this.push(S_OVERFLOW)) {
//
// WHAT TO DO HERE?
//
// go asynchronous
return;
}
}
console.warn('finished flush');
// callback
fk_flush();
}
}
// instantiate transform
let ds_transform = new example();
// pipe to stdout
ds_transform.pipe(process.stdout);
// write some data (needs to happen twice)
ds_transform.write({
item: 0,
});
ds_transform.write({
item: 1,
});
// end stream
ds_transform.end();
Трубопровод для выхода на /dev/null
так что stderr все еще выводит на консоль:
$ node transform.js > /dev/null
starting to flush
1 ответ
Настоящая проблема здесь в том, что вы должны использовать дуплекс, а не преобразование. Поскольку каждый звонок _transform
на самом деле буферизует данные, а не применяет к ним некоторое (/) синхронное преобразование, этот тип реализации лучше подходит в качестве дуплекса, где _write()
данные буфера и вызовы _read()
начните нажимать, пока не будет обнаружено противодавление.
const stream = require('stream');
// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);
class example extends stream.Duplex {
constructor() {
super({
writableObjectMode: true,
});
// some internal queue that will be emptied once writable side ends
Object.assign(this, {
internal_queue: [],
});
}
_write(g_chunk, s_encoding, fk_write) {
// store chunk in internal queue
this.internal_queue.push(g_chunk);
// done with transform (no writes)
fk_write();
}
_read() {
console.warn('called _read()');
let a_queue = this.internal_queue;
// still data left in internal queue
while(a_queue.length) {
// remove an item from queue
a_queue.pop();
// intentionally overflow buffer
if(!this.push(S_OVERFLOW)) {
// go asynchronous
return;
}
}
console.warn('finished reading');
// nothing more to read
this.push(null);
}
}
// instantiate transform
let ds_transform = new example();
// pipe to stdout
ds_transform.pipe(process.stdout);
// write some data (needs to happen twice)
ds_transform.write({
item: 0,
});
ds_transform.write({
item: 1,
});
// end stream
ds_transform.end();
Тогда вы получите:
$ node duplex.js > /dev/null
called _read()
called _read()
called _read()
finished reading