Данные не преобразуются Node.js Преобразование потоков
Я пытаюсь сделать поток преобразования потока, который принимает данные из socket.io
, преобразовав его в JSON, а затем отправив в stdout. Я полностью озадачен тем, почему данные просто проходят без каких-либо преобразований. Я использую through2
библиотека. Вот мой код:
getStreamNames().then(streamNames => {
const socket = io(SOCKETIO_URL);
socket.on('connect', () => {
socket.emit('Subscribe', {subs: streamNames});
});
const stream = through2.obj(function (chunk, enc, callback) {
callback(null, parseString(chunk))
}).pipe(through2.obj(function (chunk, enc, callback) {
callback(null, JSON.stringify(chunk));
})).pipe(process.stdout);
socket.on('m', data => stream.write(data));
},
);
getStreamNames
возвращает обещание, которое разрешается в массив имен потоков (я вызываю внешний socket.io
API) и parseString
берет строку, возвращенную из API, и преобразует ее в JSON, чтобы она была управляемой.
То, что я ищу, это моя консоль, чтобы распечатать stringify'd JSON после того, как я проанализирую его, используя parseString
а затем сделать его стандартным с JSON.stringify
, Что на самом деле происходит, так это то, что данные проходят через поток и не преобразуются.
Для справки, данные, поступающие из API, находятся в странном формате, что-то вроде
field1~field2~0x23~fieldn
и вот почему мне нужно parseString
метод.
Я должен что-то упустить. Есть идеи?
РЕДАКТИРОВАТЬ:
parseString:
function(value) {
var valuesArray = value.split("~");
var valuesArrayLenght = valuesArray.length;
var mask = valuesArray[valuesArrayLenght - 1];
var maskInt = parseInt(mask, 16);
var unpackedCurrent = {};
var currentField = 0;
for (var property in this.FIELDS) {
if (this.FIELDS[property] === 0) {
unpackedCurrent[property] = valuesArray[currentField];
currentField++;
}
else if (maskInt & this.FIELDS[property]) {
if (property === 'LASTMARKET') {
unpackedCurrent[property] = valuesArray[currentField];
}
else {
unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
}
currentField++;
}
}
return unpackedCurrent;
};
Спасибо
1 ответ
Проблема в том, что поток, который вы пишете, на самом деле process.stdout
, так как .pipe
возвращает последний stream.Writable
так что вы можете продолжать цепочку, в вашем случае, process.stdout
,
const x = stream.pipe(stream2).pipe(stream3).pipe(process.stdout);
x === process.stdout // true
Итак, все, что вы делали, было: process.stdout.write(data)
не пройдя по трубопроводу.
Что вам нужно сделать, это назначить свой первый through2
поток к stream
переменная, а затем .pipe
в этом потоке.
const stream = through2.obj((chunk, enc, callback) => {
callback(null, parseString(chunk))
});
stream
.pipe(through2.obj((chunk, enc, callback) => {
callback(null, JSON.stringify(chunk));
}))
.pipe(process.stdout);
socket.on('m', data => stream.write(data));