Как разбить большой CSV-файл, обработать его на нескольких ядрах и объединить результат в один, используя nodeJs

У меня очень большой CSV-файл (370 ГБ). У меня достаточно оперативной памяти (64 ГБ) под управлением Windows 10.

Я думаю, что следующее - лучший способ обработки данных в моей системе, но я не уверен, как это сделать.

  1. Я хочу разбить его на 4 разных файла CSV (потому что у меня есть четырехъядерный System).
  2. Затем обработайте каждый файл на разных ядрах (используя кластер).
  3. После обработки результат должен быть объединен в один.

В настоящее время я использую следующий код для извлечения и обработки данных:

var fs = require('fs'), 
    util = require('util'), 
    stream = require('stream'), 
    es = require('event-stream'),
    path = require("path");
var dir = path.join(__dirname,'./ttwe.csv');


var lineNr = 0;

var s = fs.createReadStream('AIR_Pre_Processed_Data_For_EDA_16th_June_2016.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // find /v /c "" AIR_Pre_Processed_Data_For_EDA_16th_June_2016.csv (command to get totel no of line which gives 37931757)
        s.pause();

        lineNr += 1;
        let ttp=line.split("^")[1].replace(/_," ");
        if(ttp !='NA' && ttp !='undefined' && ttp !=''){
            fs.appendFile(dir,ttp+",\n");
        }
        process.stdout.write('\u001B[2J\u001B[0;0f');
        console.log(lineNr," of 37931757 Lines: ",parseInt((lineNr/37931757)*100),"%");

        s.resume();
    })
    .on('error', function(e){
        console.log('Error while reading file.',e);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

0 ответов

Есть пакет, который разбивает этот огромный файл на более мелкие: csi-split-stream.

Вы можете определить максимальный размер блока для каждого файла, а затем обработать их отдельно.

const csvSplitStream = require('csv-split-stream');


return csvSplitStream.split(
  fs.createReadStream('input.csv'),
  {
    lineLimit: 100
  },
  (index) => fs.createWriteStream(`output-${index}.csv`)
)
.then(csvSplitResponse => {
  console.log('csvSplitStream succeeded.', csvSplitResponse);
  // outputs: {
  //  "totalChunks": 350,
  //  "options": {
  //    "delimiter": "\n",
  //    "lineLimit": "10000"
  //  }
  // }
}).catch(csvSplitError => {
  console.log('csvSplitStream failed!', csvSplitError);
});

получил это здесь

Другие вопросы по тегам