Как реализовать поток, который правильно обрабатывает противодавление в node.js?

Я не могу понять, как реализовать поток, который правильно обрабатывает противодавление. Вы никогда не должны использовать паузу и возобновить?

У меня есть эта реализация, я пытаюсь заставить работать правильно:

var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this, {highWaterMark: highWaterMark})
    this.stream = myStream

    myStream.on('readable', function() {
        var data = myStream.read(5000)
        //process.stdout.write("Eff: "+data)
        if(data !== null) {
            if(!this.push(data)) {
                process.stdout.write("Pause")
                this.pause()
            }
            callback(data)
        }
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    process.stdout.write("resume")
    //this.resume() // putting this in for some reason causes the stream to not output???
}

Он правильно отправляет вывод, но не создает обратное давление. Как я могу изменить его, чтобы должным образом поддерживать противодавление?

1 ответ

Хорошо, я наконец понял это после большого количества проб и ошибок. Пара рекомендаций:

  • Никогда не используйте паузу или возобновление (иначе это перейдет в унаследованный "текущий" режим)
  • Никогда не добавляйте прослушиватель событий "data" (иначе он перейдет в устаревший "текущий" режим)
  • Ответственность разработчика - отслеживать, когда источник читабелен.
  • Ответственность разработчика - отслеживать, когда пункт назначения хочет больше данных.
  • Реализация не должна читать какие-либо данные до _read метод называется
  • Аргумент к read сообщает источнику, что ему должно быть столько байтов, лучше передать аргумент, переданный this._read в источник read метод. Таким образом, вы должны быть в состоянии настроить, сколько читать за раз в месте назначения, а остальная часть цепочки потоков должна быть автоматической.

Вот что я изменил:

Обновление: я создал Readable, который намного проще реализовать с надлежащим обратным давлением и должен иметь такую ​​же гибкость, как и нативные потоки узла.

var Readable = stream.Readable
var util = require('util')

// an easier Readable stream interface to implement
// requires that subclasses:
    // implement a _readSource function that
        // * gets the same parameter as Readable._read (size)
        // * should return either data to write, or null if the source doesn't have more data yet
    // call 'sourceHasData(hasData)' when the source starts or stops having data available
    // calls 'end()' when the source is out of data (forever)
var Stream666 = {}
Stream666.Readable = function() {
    stream.Readable.apply(this, arguments)
    if(this._readSource === undefined) {
        throw new Error("You must define a _readSource function for an object implementing Stream666")
    }

    this._sourceHasData = false
    this._destinationWantsData = false
    this._size = undefined // can be set by _read
}
util.inherits(Stream666.Readable, stream.Readable)
Stream666.Readable.prototype._read = function(size) {
    this._destinationWantsData = true
    if(this._sourceHasData) {
        pushSourceData(this, size)
    } else {
        this._size = size
    }
}
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) {
    this._sourceHasData = _sourceHasData
    if(_sourceHasData && this._destinationWantsData) {
        pushSourceData(this, this._size)
    }
}
Stream666.Readable.prototype.end = function() {
    this.push(null)
}
function pushSourceData(stream666Readable, size) {
    var data = stream666Readable._readSource(size)
    if(data !== null) {
        if(!stream666Readable.push(data)) {
            stream666Readable._destinationWantsData = false
        }
    } else {
        stream666Readable._sourceHasData = false
    }
}    

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = function(myStream, callback) {
    Stream666.Readable.call(this)
    this.stream = myStream
    this.callback = callback

    myStream.on('readable', function() {
        this.sourceHasData(true)
    }.bind(this))
    myStream.on('end', function() {
        this.end()
    }.bind(this))
}
util.inherits(StreamPeeker, Stream666.Readable)
StreamPeeker.prototype._readSource = function(size) {
    var data = this.stream.read(size)
    if(data !== null) {
        this.callback(data)
        return data
    } else {
        this.sourceHasData(false)
        return null
    }
}

Старый ответ:

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this)
    this.stream = myStream
    this.callback = callback
    this.reading = false
    this.sourceIsReadable = false

    myStream.on('readable', function() {
        this.sourceIsReadable = true
        this._readMoreData()
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    this.reading = true
    if(this.sourceIsReadable) {
        this._readMoreData()
    }
}
StreamPeeker.prototype._readMoreData = function() {
    if(!this.reading) return;

    var data = this.stream.read()
    if(data !== null) {
        if(!this.push(data)) {
            this.reading = false
        }
        this.callback(data)
    }
}
Другие вопросы по тегам