Как разбить большой CSV-файл, обработать его на нескольких ядрах и объединить результат в один, используя nodeJs
У меня очень большой CSV-файл (370 ГБ). У меня достаточно оперативной памяти (64 ГБ) под управлением Windows 10.
Я думаю, что следующее - лучший способ обработки данных в моей системе, но я не уверен, как это сделать.
- Я хочу разбить его на 4 разных файла CSV (потому что у меня есть четырехъядерный System).
- Затем обработайте каждый файл на разных ядрах (используя кластер).
- После обработки результат должен быть объединен в один.
В настоящее время я использую следующий код для извлечения и обработки данных:
var fs = require('fs'),
util = require('util'),
stream = require('stream'),
es = require('event-stream'),
path = require("path");
var dir = path.join(__dirname,'./ttwe.csv');
var lineNr = 0;
var s = fs.createReadStream('AIR_Pre_Processed_Data_For_EDA_16th_June_2016.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
// find /v /c "" AIR_Pre_Processed_Data_For_EDA_16th_June_2016.csv (command to get totel no of line which gives 37931757)
s.pause();
lineNr += 1;
let ttp=line.split("^")[1].replace(/_," ");
if(ttp !='NA' && ttp !='undefined' && ttp !=''){
fs.appendFile(dir,ttp+",\n");
}
process.stdout.write('\u001B[2J\u001B[0;0f');
console.log(lineNr," of 37931757 Lines: ",parseInt((lineNr/37931757)*100),"%");
s.resume();
})
.on('error', function(e){
console.log('Error while reading file.',e);
})
.on('end', function(){
console.log('Read entire file.')
})
);
0 ответов
Есть пакет, который разбивает этот огромный файл на более мелкие:
csi-split-stream
.
Вы можете определить максимальный размер блока для каждого файла, а затем обработать их отдельно.
const csvSplitStream = require('csv-split-stream');
return csvSplitStream.split(
fs.createReadStream('input.csv'),
{
lineLimit: 100
},
(index) => fs.createWriteStream(`output-${index}.csv`)
)
.then(csvSplitResponse => {
console.log('csvSplitStream succeeded.', csvSplitResponse);
// outputs: {
// "totalChunks": 350,
// "options": {
// "delimiter": "\n",
// "lineLimit": "10000"
// }
// }
}).catch(csvSplitError => {
console.log('csvSplitStream failed!', csvSplitError);
});