Данные не преобразуются Node.js Преобразование потоков

Я пытаюсь сделать поток преобразования потока, который принимает данные из socket.io, преобразовав его в JSON, а затем отправив в stdout. Я полностью озадачен тем, почему данные просто проходят без каких-либо преобразований. Я использую through2 библиотека. Вот мой код:

getStreamNames().then(streamNames => {
        const socket = io(SOCKETIO_URL);
        socket.on('connect', () => {
            socket.emit('Subscribe', {subs: streamNames});
        });

        const stream = through2.obj(function (chunk, enc, callback) {
            callback(null, parseString(chunk))
        }).pipe(through2.obj(function (chunk, enc, callback) {
            callback(null, JSON.stringify(chunk));
        })).pipe(process.stdout);

        socket.on('m', data => stream.write(data));

    },
);

getStreamNames возвращает обещание, которое разрешается в массив имен потоков (я вызываю внешний socket.io API) и parseString берет строку, возвращенную из API, и преобразует ее в JSON, чтобы она была управляемой.

То, что я ищу, это моя консоль, чтобы распечатать stringify'd JSON после того, как я проанализирую его, используя parseString а затем сделать его стандартным с JSON.stringify, Что на самом деле происходит, так это то, что данные проходят через поток и не преобразуются.

Для справки, данные, поступающие из API, находятся в странном формате, что-то вроде

field1~field2~0x23~fieldn

и вот почему мне нужно parseString метод.

Я должен что-то упустить. Есть идеи?

РЕДАКТИРОВАТЬ:

parseString:

function(value) {
    var valuesArray = value.split("~");
    var valuesArrayLenght = valuesArray.length;
    var mask = valuesArray[valuesArrayLenght - 1];
    var maskInt = parseInt(mask, 16);
    var unpackedCurrent = {};
    var currentField = 0;
    for (var property in this.FIELDS) {
        if (this.FIELDS[property] === 0) {
            unpackedCurrent[property] = valuesArray[currentField];
            currentField++;
        }
        else if (maskInt & this.FIELDS[property]) {
            if (property === 'LASTMARKET') {
                unpackedCurrent[property] = valuesArray[currentField];
            }
            else {
                unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
            }
            currentField++;
        }
    }

    return unpackedCurrent;
};

Спасибо

1 ответ

Решение

Проблема в том, что поток, который вы пишете, на самом деле process.stdout, так как .pipe возвращает последний stream.Writableтак что вы можете продолжать цепочку, в вашем случае, process.stdout,

const x = stream.pipe(stream2).pipe(stream3).pipe(process.stdout);
x === process.stdout // true

Итак, все, что вы делали, было: process.stdout.write(data) не пройдя по трубопроводу.

Что вам нужно сделать, это назначить свой первый through2 поток к stream переменная, а затем .pipe в этом потоке.

const stream = through2.obj((chunk, enc, callback) => {
    callback(null, parseString(chunk))
});

stream
    .pipe(through2.obj((chunk, enc, callback) => {
        callback(null, JSON.stringify(chunk));
    }))
    .pipe(process.stdout);

socket.on('m', data => stream.write(data));
Другие вопросы по тегам