Узел - Абстрагирование шагов трубы в функцию
Я знаком с потоками Node, но я борюсь за лучшие практики для абстрагирования кода, которые я многократно использую в одном шаге конвейера.
Вот урезанная версия того, что я пишу сегодня:
inputStream
.pipe(csv.parse({columns:true})
.pipe(csv.transform(function(row) {return transform(row); }))
.pipe(csv.stringify({header: true})
.pipe(outputStream);
Фактическая работа происходит в transform()
, Единственные вещи, которые действительно меняются, это inputStream
, transform()
, а также outputStream
, Как я уже сказал, это урезанная версия того, что я на самом деле использую. У меня много обработки ошибок и регистрации на каждом шаге конвейера, поэтому в конечном итоге я пытаюсь абстрагировать код.
То, что я хочу написать, это один шаг трубы, например:
inputStream
.pipe(csvFunction(transform(row)))
.pipe(outputStream);
Я пытаюсь понять, как превратить эти шаги в единую функцию, которая принимает поток и возвращает поток. Я смотрел на библиотеки, например, сквозь 2, но я не уверен, как это приведет меня туда, куда я пытаюсь пойти.
2 ответа
Вот что я закончил Я использовал библиотеку through2 и API потоковой передачи библиотеки csv для создания нужной функции pipe.
var csv = require('csv');
through = require('through2');
module.exports = function(transformFunc) {
parser = csv.parse({columns:true, relax_column_count:true}),
transformer = csv.transform(function(row) {
return transformFunc(row);
}),
stringifier = csv.stringify({header: true});
return through(function(chunk,enc,cb){
var stream = this;
parser.on('data', function(data){
transformer.write(data);
});
transformer.on('data', function(data){
stringifier.write(data);
});
stringifier.on('data', function(data){
stream.push(data);
});
parser.write(chunk);
parser.removeAllListeners('data');
transformer.removeAllListeners('data');
stringifier.removeAllListeners('data');
cb();
})
}
Стоит отметить ту часть, где я удаляю слушателей событий в конце, это было связано с ошибками памяти, когда я создал слишком много слушателей событий. Я изначально пытался решить эту проблему, слушая события с once
, но это предотвратило чтение последующих кусков и переход к следующему шагу конвейера.
Дайте мне знать, если у кого-то есть отзывы или дополнительные идеи.
Вы можете использовать PassThrough
класс как это:
var PassThrough = require('stream').PassThrough;
var csvStream = new PassThrough();
csvStream.on('pipe', function (source) {
// undo piping of source
source.unpipe(this);
// build own pipe-line and store internally
this.combinedStream =
source.pipe(csv.parse({columns: true}))
.pipe(csv.transform(function (row) {
return transform(row);
}))
.pipe(csv.stringify({header: true}));
});
csvStream.pipe = function (dest, options) {
// pipe internal combined stream to dest
return this.combinedStream.pipe(dest, options);
};
inputStream
.pipe(csvStream)
.pipe(outputStream);