Как правильно справиться с обратным давлением во время `Transform#flush`

Как правильно справляться с обратным давлением в рамках реализации Transform _flush метод? Другими словами, если .push() при промывке возвращает false, существуют ли какие-либо механизмы для правильной обработки обратного давления из нисходящего потока?

Документация предписывает прекратить толкание, как только .push() возвращает false, но тогда у Transform нет средств для прослушивания, когда нижестоящий поток хочет возобновить чтение, кроме как для переопределения this.read; но как бы это выглядело и есть ли опасность для этого?

Вот рабочий пример, с которым вы можете играть.

const stream = require('stream');

// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);


class example extends stream.Transform {
    constructor() {
        super({
            writableObjectMode: true,
        });

        // some internal queue that will be emptied once writable side ends
        Object.assign(this, {
            internal_queue: [],
        });
    }

    _transform(g_chunk, s_encoding, fk_transform) {
        // store chunk in internal queue
        this.internal_queue.push(g_chunk);

        // done with transform (no writes)
        fk_transform();
    }

    _flush(fk_flush) {
        console.warn('starting to flush');

        // now that writable side has ended, flush internal queue
        this.resumeFlush(fk_flush);
    }

    resumeFlush(fk_flush) {
        let a_queue = this.internal_queue;

        // still data left in internal queue
        while(a_queue.length) {
            // remove an item from queue
            a_queue.pop();

            // intentionally overflow buffer
            if(!this.push(S_OVERFLOW)) {
                //
                // WHAT TO DO HERE?
                //

                // go asynchronous
                return;
            }
        }

        console.warn('finished flush');

        // callback
        fk_flush();
    }
}


// instantiate transform
let ds_transform = new example();

// pipe to stdout
ds_transform.pipe(process.stdout);

// write some data (needs to happen twice)
ds_transform.write({
    item: 0,
});

ds_transform.write({
    item: 1,
});

// end stream
ds_transform.end();

Трубопровод для выхода на /dev/null так что stderr все еще выводит на консоль:

$ node transform.js > /dev/null
starting to flush

1 ответ

Настоящая проблема здесь в том, что вы должны использовать дуплекс, а не преобразование. Поскольку каждый звонок _transform на самом деле буферизует данные, а не применяет к ним некоторое (/) синхронное преобразование, этот тип реализации лучше подходит в качестве дуплекса, где _write() данные буфера и вызовы _read() начните нажимать, пока не будет обнаружено противодавление.

const stream = require('stream');

// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);


class example extends stream.Duplex {
    constructor() {
        super({
            writableObjectMode: true,
        });

        // some internal queue that will be emptied once writable side ends
        Object.assign(this, {
            internal_queue: [],
        });
    }

    _write(g_chunk, s_encoding, fk_write) {
        // store chunk in internal queue
        this.internal_queue.push(g_chunk);

        // done with transform (no writes)
        fk_write();
    }

    _read() {
        console.warn('called _read()');
        let a_queue = this.internal_queue;

        // still data left in internal queue
        while(a_queue.length) {
            // remove an item from queue
            a_queue.pop();

            // intentionally overflow buffer
            if(!this.push(S_OVERFLOW)) {
                // go asynchronous
                return;
            }
        }

        console.warn('finished reading');

        // nothing more to read
        this.push(null);
    }
}


// instantiate transform
let ds_transform = new example();

// pipe to stdout
ds_transform.pipe(process.stdout);

// write some data (needs to happen twice)
ds_transform.write({
    item: 0,
});

ds_transform.write({
    item: 1,
});

// end stream
ds_transform.end();

Тогда вы получите:

$ node duplex.js > /dev/null
called _read()
called _read()
called _read()
finished reading
Другие вопросы по тегам