Highland.js для разбора CSV

Я пытаюсь написать очень функциональную манеру. Мы используем Highland.js для управления потоковой обработкой, однако, поскольку я настолько новичок, я думаю, что я действительно путаюсь с тем, как я могу справиться с этой уникальной ситуацией.

Проблема в том, что все данные в файловом потоке не согласованы. Первая строка в файле обычно является заголовком, который мы хотим сохранить в памяти и затем сжать все строки в потоке.

Вот мой первый опыт:

var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

var headers = [];

var through = _.pipeline(
    _.split(),
    _.head(),
    _.doto(function(col) {
        headers = col.split(',');
        return headers;
    }),

    ......

    _.splitBy(','),
    _.zip(headers),
    _.wrapCallback(process)
);

_(stream)
    .pipe(through)
    .pipe(output);

Первая команда в конвейере - разделить файлы по строкам. Следующий захватывает заголовок, а doto объявляет его как глобальную переменную. Проблема в том, что следующие несколько строк в потоке не существуют, и поэтому процесс заблокирован... скорее всего из-за команды head() над ним.

Я попробовал несколько других вариантов, но я чувствую, что этот пример дает вам представление о том, куда мне нужно пойти с ним.

Любое руководство по этому вопросу было бы полезно - это также поднимает вопрос о том, есть ли у меня разные значения в каждой из моих строк, как я могу разделить поток процесса среди ряда различных потоковых операций переменной длины / сложности.

Благодарю.

РЕДАКТИРОВАТЬ: я добился лучшего результата, но я ставлю под сомнение эффективность этого - есть ли способ, которым я могу оптимизировать это так, чтобы при каждом запуске я не проверял, записаны ли заголовки? Это все еще кажется небрежным.

var through = _.pipeline(
    _.split(),
    _.filter(function(row) {
        // Filter out bogus values
        if (! row || headers) {
            return true;
        }
        headers = row.split(',');
        return false;
    }),
    _.map(function(row) {
        return row.split(',')
    }),
    _.batch(500),
    _.compact(),
    _.map(function(row) {
        return JSON.stringify(row) + "\n";
    })
);

_(stream)
    .pipe(through)

1 ответ

Ты можешь использовать Stream.observe() или же Stream.fork() разделить поток.

var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var through = highland.pipeline(function(s) {
    var headerStream, headers;
    // setup a shared variable to store the headers
    headers = [];
    // setup the csv processing
    s = s
        // split input into lines
        .split()
        // remove empty lines
        .compact()
        // split lines into arrays
        .map(function(row) {
            return row.split(',');
        });
    // create a new stream to grab the header
    headerStream = s.observe();
    // pause the original stream
    s.pause();
    // setup processing of the non-header rows
    s = s
        // drop the header row
        .drop(1)
        // convert the rest of the rows to objects
        .map(function(row) {
            var obj = headers.reduce(function(obj, key, i) {
                obj[key] = row[i];
                return obj;
            }, {});
            return JSON.stringify(obj) + "\n";
        });
    // grab the first row from the header stream
    // save the headers and then resume the normal stream
    headerStream.head().toArray(function(rows) {
        headers = rows[0];
        s.resume();
    });
    return s;
});
_(stream)
    .pipe(through)
    .pipe(output);

При этом ваш синтаксический анализ CSV не учитывает экранирование строк и запятых в ваших значениях. Как правило, это делается в CSV-файлах, заключая значения в двойные кавычки. И затем двойные кавычки избегают, помещая два рядом друг с другом. Немного сложно понять это правильно, поэтому я бы порекомендовал использовать пакет, который обрабатывает это, такой как fast-csv.

Тогда ваш код может выглядеть так:

var _      = require('highland');
var fs     = require('fs');
var csv    = require('fast-csv');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

_(stream.pipe(csv({headers: true, ignoreEmpty: true})))
    .map(function(row) {
        return JSON.stringify(row) + "\n";
    })
    .pipe(output);
Другие вопросы по тегам