Куча узлов исчерпана при передаче данных JSONStream.parsed() через es.map() и JSONStream.stringify() в файловый поток

Я пытаюсь передать поток ввода (созданный из огромного файла GeoJSON) через JSONStream.parse(), чтобы разбить поток на объекты, затем через event-stream.map(), чтобы позволить мне преобразовать объект, затем через JSONStream.stringify() для создания из него строки и, наконец, для потока вывода с возможностью записи. Когда процесс запускается, я вижу, как объем памяти узла продолжает расти до тех пор, пока он не исчерпает кучу. Вот самый простой скрипт (test.js), воссоздающий проблему:

const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")

out = fs.createWriteStream("/dev/null")
process.stdin
    .pipe(js.parse("features.*"))
    .pipe(es.map( function(data, cb) { 
        cb(null, data);
        return;
    } ))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out)

Небольшой bash-скрипт (barf.sh), который изливает бесконечный поток JSON в узел process.stdin, будет постепенно увеличивать кучу узла:

#!/bin/bash

echo '{"type":"FeatureCollection","features":['
while :
do
    echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done

запустив его так:

barf.sh | node test.js

Есть несколько любопытных способов обойти эту проблему:

  • Удалите fs.createWriteStream() и измените последний этап конвейера с ".pipe(out)" на ".pipe(process.stdout)", а затем stdout узла канала на /dev/null
  • Измените асинхронный es.map() на синхронный es.mapSync()

Любое из двух предыдущих действий позволит сценарию работать вечно, при этом объем памяти узла будет низким и неизменным. Я использую узел v6.3.1, поток событий v3.3.4 и JSONStream 1.1.4 на восьмиядерном компьютере с 8 ГБ ОЗУ под управлением Ubuntu 16.04.

Я надеюсь, что кто-то может помочь мне исправить то, что, я уверен, является очевидной ошибкой с моей стороны.

1 ответ

Решение

JSONStream не является потоком streams2, поэтому он не поддерживает противодавление. (Здесь есть краткое резюме о streams2.)

Это означает, что данные будут выходить из parse поток в data события и что поток будет продолжать выкачивать их независимо от того, готов ли поток потребления к ним. Если где-то в конвейере есть какое-то несоответствие между тем, как быстро что-то можно прочитать и записать, будет буферизация - это то, что вы видите.

Ваш barf.sh жгут видит особенности прокачаны через stdin, Если вместо этого вы читали массивный файл, вы должны иметь возможность управлять потоком, останавливая поток чтения файла. Так что, если вы должны были вставить некоторые pause/resume логика в ваш map обратный вызов, вы должны быть в состоянии заставить его обрабатывать массивный файл; это займет немного больше времени. Я бы экспериментировал с чем-то вроде этого:

let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
    .pipe(js.parse("features.*"))
    .pipe(es.map(function(data, cb) {
        // This is just an example; a 10-millisecond wait per feature would be very slow.
        if (!in.isPaused()) {
            in.pause();
            global.setTimeout(function () { in.resume(); }, 10);
        }
        cb(null, data);
        return;
    }))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out);

Кстати, используя mapSync не имеет большого значения на моем компьютере (который старый и медленный). Однако, если у вас нет какой-либо асинхронной операции для выполнения в map Я бы пошел с mapSync,

Другие вопросы по тегам