Куча узлов исчерпана при передаче данных JSONStream.parsed() через es.map() и JSONStream.stringify() в файловый поток
Я пытаюсь передать поток ввода (созданный из огромного файла GeoJSON) через JSONStream.parse(), чтобы разбить поток на объекты, затем через event-stream.map(), чтобы позволить мне преобразовать объект, затем через JSONStream.stringify() для создания из него строки и, наконец, для потока вывода с возможностью записи. Когда процесс запускается, я вижу, как объем памяти узла продолжает расти до тех пор, пока он не исчерпает кучу. Вот самый простой скрипт (test.js), воссоздающий проблему:
const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")
out = fs.createWriteStream("/dev/null")
process.stdin
.pipe(js.parse("features.*"))
.pipe(es.map( function(data, cb) {
cb(null, data);
return;
} ))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out)
Небольшой bash-скрипт (barf.sh), который изливает бесконечный поток JSON в узел process.stdin, будет постепенно увеличивать кучу узла:
#!/bin/bash
echo '{"type":"FeatureCollection","features":['
while :
do
echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done
запустив его так:
barf.sh | node test.js
Есть несколько любопытных способов обойти эту проблему:
- Удалите fs.createWriteStream() и измените последний этап конвейера с ".pipe(out)" на ".pipe(process.stdout)", а затем stdout узла канала на /dev/null
- Измените асинхронный es.map() на синхронный es.mapSync()
Любое из двух предыдущих действий позволит сценарию работать вечно, при этом объем памяти узла будет низким и неизменным. Я использую узел v6.3.1, поток событий v3.3.4 и JSONStream 1.1.4 на восьмиядерном компьютере с 8 ГБ ОЗУ под управлением Ubuntu 16.04.
Я надеюсь, что кто-то может помочь мне исправить то, что, я уверен, является очевидной ошибкой с моей стороны.
1 ответ
JSONStream не является потоком streams2, поэтому он не поддерживает противодавление. (Здесь есть краткое резюме о streams2.)
Это означает, что данные будут выходить из parse
поток в data
события и что поток будет продолжать выкачивать их независимо от того, готов ли поток потребления к ним. Если где-то в конвейере есть какое-то несоответствие между тем, как быстро что-то можно прочитать и записать, будет буферизация - это то, что вы видите.
Ваш barf.sh
жгут видит особенности прокачаны через stdin
, Если вместо этого вы читали массивный файл, вы должны иметь возможность управлять потоком, останавливая поток чтения файла. Так что, если вы должны были вставить некоторые pause/resume
логика в ваш map
обратный вызов, вы должны быть в состоянии заставить его обрабатывать массивный файл; это займет немного больше времени. Я бы экспериментировал с чем-то вроде этого:
let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
.pipe(js.parse("features.*"))
.pipe(es.map(function(data, cb) {
// This is just an example; a 10-millisecond wait per feature would be very slow.
if (!in.isPaused()) {
in.pause();
global.setTimeout(function () { in.resume(); }, 10);
}
cb(null, data);
return;
}))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out);
Кстати, используя mapSync
не имеет большого значения на моем компьютере (который старый и медленный). Однако, если у вас нет какой-либо асинхронной операции для выполнения в map
Я бы пошел с mapSync
,