Как реализовать поток, который правильно обрабатывает противодавление в node.js?
Я не могу понять, как реализовать поток, который правильно обрабатывает противодавление. Вы никогда не должны использовать паузу и возобновить?
У меня есть эта реализация, я пытаюсь заставить работать правильно:
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
stream.Readable.call(this, {highWaterMark: highWaterMark})
this.stream = myStream
myStream.on('readable', function() {
var data = myStream.read(5000)
//process.stdout.write("Eff: "+data)
if(data !== null) {
if(!this.push(data)) {
process.stdout.write("Pause")
this.pause()
}
callback(data)
}
}.bind(this))
myStream.on('end', function() {
this.push(null)
}.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
process.stdout.write("resume")
//this.resume() // putting this in for some reason causes the stream to not output???
}
Он правильно отправляет вывод, но не создает обратное давление. Как я могу изменить его, чтобы должным образом поддерживать противодавление?
1 ответ
Хорошо, я наконец понял это после большого количества проб и ошибок. Пара рекомендаций:
- Никогда не используйте паузу или возобновление (иначе это перейдет в унаследованный "текущий" режим)
- Никогда не добавляйте прослушиватель событий "data" (иначе он перейдет в устаревший "текущий" режим)
- Ответственность разработчика - отслеживать, когда источник читабелен.
- Ответственность разработчика - отслеживать, когда пункт назначения хочет больше данных.
- Реализация не должна читать какие-либо данные до
_read
метод называется - Аргумент к
read
сообщает источнику, что ему должно быть столько байтов, лучше передать аргумент, переданныйthis._read
в источникread
метод. Таким образом, вы должны быть в состоянии настроить, сколько читать за раз в месте назначения, а остальная часть цепочки потоков должна быть автоматической.
Вот что я изменил:
Обновление: я создал Readable, который намного проще реализовать с надлежащим обратным давлением и должен иметь такую же гибкость, как и нативные потоки узла.
var Readable = stream.Readable
var util = require('util')
// an easier Readable stream interface to implement
// requires that subclasses:
// implement a _readSource function that
// * gets the same parameter as Readable._read (size)
// * should return either data to write, or null if the source doesn't have more data yet
// call 'sourceHasData(hasData)' when the source starts or stops having data available
// calls 'end()' when the source is out of data (forever)
var Stream666 = {}
Stream666.Readable = function() {
stream.Readable.apply(this, arguments)
if(this._readSource === undefined) {
throw new Error("You must define a _readSource function for an object implementing Stream666")
}
this._sourceHasData = false
this._destinationWantsData = false
this._size = undefined // can be set by _read
}
util.inherits(Stream666.Readable, stream.Readable)
Stream666.Readable.prototype._read = function(size) {
this._destinationWantsData = true
if(this._sourceHasData) {
pushSourceData(this, size)
} else {
this._size = size
}
}
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) {
this._sourceHasData = _sourceHasData
if(_sourceHasData && this._destinationWantsData) {
pushSourceData(this, this._size)
}
}
Stream666.Readable.prototype.end = function() {
this.push(null)
}
function pushSourceData(stream666Readable, size) {
var data = stream666Readable._readSource(size)
if(data !== null) {
if(!stream666Readable.push(data)) {
stream666Readable._destinationWantsData = false
}
} else {
stream666Readable._sourceHasData = false
}
}
// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
// stream - the stream to peek at
// callback - called when there's data sent from the passed stream
var StreamPeeker = function(myStream, callback) {
Stream666.Readable.call(this)
this.stream = myStream
this.callback = callback
myStream.on('readable', function() {
this.sourceHasData(true)
}.bind(this))
myStream.on('end', function() {
this.end()
}.bind(this))
}
util.inherits(StreamPeeker, Stream666.Readable)
StreamPeeker.prototype._readSource = function(size) {
var data = this.stream.read(size)
if(data !== null) {
this.callback(data)
return data
} else {
this.sourceHasData(false)
return null
}
}
Старый ответ:
// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
// stream - the stream to peek at
// callback - called when there's data sent from the passed stream
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
stream.Readable.call(this)
this.stream = myStream
this.callback = callback
this.reading = false
this.sourceIsReadable = false
myStream.on('readable', function() {
this.sourceIsReadable = true
this._readMoreData()
}.bind(this))
myStream.on('end', function() {
this.push(null)
}.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
this.reading = true
if(this.sourceIsReadable) {
this._readMoreData()
}
}
StreamPeeker.prototype._readMoreData = function() {
if(!this.reading) return;
var data = this.stream.read()
if(data !== null) {
if(!this.push(data)) {
this.reading = false
}
this.callback(data)
}
}