ПапаПарс и Хайленд

Мне нужно проанализировать очень большой CSV-файл в NodeJS и сохранить его в базе данных (асинхронная операция), которая допускает до 500 записей одновременно. Из-за ограничений памяти я должен передавать файл CSV и использовать PapaParse для анализа файла CSV (как это лучше всего работает в моем случае).

Поскольку PapaParse использует подход стиля обратного вызова для разбора потоков Node.js, я не видел простого сочетания highland (для пакетной обработки и преобразования данных) и PapaParse. Итак, я попытался использовать поток ParseThrough для записи данных и чтения этого потока с помощью highland для пакетной обработки:

const csv = require('papaparse');
const fs = require('fs');
const highland = require('highland');
const { PassThrough } = require('stream');

const passThroughStream = new PassThrough({ objectMode: true });

csv.parse(fileStream, {
  step: function(row) {
    // Write data to stream
    passThroughStream.write(row.data[0]);
  },
  complete: function() {
    // Somehow "end" the stream
    passThroughStream.write(null);
  },
});

highland(passThroughStream)
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });

Очевидно, что это не работает как есть и ничего не делает на самом деле. Возможно ли что-то подобное или даже лучший способ для анализа очень больших файлов CSV и сохранения строк в базе данных (в пакетах до 500)?

Изменить: используя csv пакет ( https://www.npmjs.com/package/csv) это можно было бы сделать так (то же самое для fast-csv):

highland(fileStream.pipe(csv.parse()))
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });

Но, к сожалению, оба пакета NPM не анализируют файлы CSV должным образом во всех случаях.

1 ответ

Решение

После быстрого взгляда на papaparse Я решил реализовать парсер CSV в scramjet,

fileStream.pipe(new scramjet.StringStream('utf-8'))
    .csvParse(options)
    .batch(500)
    .map(items => db.insertArray('some_table', items))

Я надеюсь, что это работает для вас.:)

Другие вопросы по тегам