Узел - Абстрагирование шагов трубы в функцию

Я знаком с потоками Node, но я борюсь за лучшие практики для абстрагирования кода, которые я многократно использую в одном шаге конвейера.

Вот урезанная версия того, что я пишу сегодня:

inputStream
.pipe(csv.parse({columns:true})
.pipe(csv.transform(function(row) {return transform(row); }))
.pipe(csv.stringify({header: true})
.pipe(outputStream);

Фактическая работа происходит в transform(), Единственные вещи, которые действительно меняются, это inputStream, transform(), а также outputStream, Как я уже сказал, это урезанная версия того, что я на самом деле использую. У меня много обработки ошибок и регистрации на каждом шаге конвейера, поэтому в конечном итоге я пытаюсь абстрагировать код.

То, что я хочу написать, это один шаг трубы, например:

inputStream
.pipe(csvFunction(transform(row)))
.pipe(outputStream);

Я пытаюсь понять, как превратить эти шаги в единую функцию, которая принимает поток и возвращает поток. Я смотрел на библиотеки, например, сквозь 2, но я не уверен, как это приведет меня туда, куда я пытаюсь пойти.

2 ответа

Решение

Вот что я закончил Я использовал библиотеку through2 и API потоковой передачи библиотеки csv для создания нужной функции pipe.

var csv = require('csv');
    through = require('through2');

module.exports = function(transformFunc) {
    parser = csv.parse({columns:true, relax_column_count:true}),
    transformer = csv.transform(function(row) {
        return transformFunc(row);
    }),
    stringifier = csv.stringify({header: true});

    return through(function(chunk,enc,cb){
        var stream = this;

            parser.on('data', function(data){
                transformer.write(data);
            });

            transformer.on('data', function(data){
                stringifier.write(data);
            });

            stringifier.on('data', function(data){
                stream.push(data);
            });

            parser.write(chunk);

            parser.removeAllListeners('data');
            transformer.removeAllListeners('data');
            stringifier.removeAllListeners('data');
            cb();
    })
}

Стоит отметить ту часть, где я удаляю слушателей событий в конце, это было связано с ошибками памяти, когда я создал слишком много слушателей событий. Я изначально пытался решить эту проблему, слушая события с once, но это предотвратило чтение последующих кусков и переход к следующему шагу конвейера.

Дайте мне знать, если у кого-то есть отзывы или дополнительные идеи.

Вы можете использовать PassThrough класс как это:

var PassThrough = require('stream').PassThrough;

var csvStream = new PassThrough();
csvStream.on('pipe', function (source) {
  // undo piping of source
  source.unpipe(this);
  // build own pipe-line and store internally
  this.combinedStream =
    source.pipe(csv.parse({columns: true}))
      .pipe(csv.transform(function (row) {
        return transform(row);
      }))
      .pipe(csv.stringify({header: true}));
});

csvStream.pipe = function (dest, options) {
  // pipe internal combined stream to dest
  return this.combinedStream.pipe(dest, options);
};

inputStream
  .pipe(csvStream)
  .pipe(outputStream);
Другие вопросы по тегам