Как создать несколько потоков из одного потока в узле?
У меня есть данные 1 мкл, поступающие от внешнего API WebSocket, и я пытаюсь сгенерировать из них потоки 5 м 15 м 30 м 1 ч 1 д и 1 Вт. Мой текущий TransformStream может конвертировать 1 м на один из вышеупомянутых таймфреймов
Как мне это сделать
one.pipe(five)
one.pipe(fifteen)
one.pipe(thirty)
без создания нескольких экземпляров исходного потока "один". В настоящее время мой единственный экземпляр TransformStream испускает исходные данные плюс все необходимые временные рамки.
class TimeFrameGenerator extends Transform {
constructor(timeframes) {
super({ readableObjectMode: true, writableObjectMode: true });
this.kline = {}
//Consider using an object for this, looping with a for in is much faster for this case than using an array or accessing object keys
this.timeframes = {
"1m": 1000 * 60,
"5m": 1000 * 60 * 5,
"15m": 1000 * 60 * 15,
"30m": 1000 * 60 * 30,
"1h": 1000 * 60 * 60,
"1d": 1000 * 60 * 60 * 24
}
}
_transform(rawKline, encoding, callback) {
this.push(rawKline);
const { pairId, base, quote, interval, openTime, closeTime, timestamp, open, high, low, close, baseVolume, quoteVolume, isFinal, raw } = rawKline;
if (typeof this.kline[pairId] === "undefined")
this.kline[pairId] = {};
for (let timeframe in this.timeframes) {
if (timeframe === interval)
continue;
const millis = this.timeframes[timeframe];
const resampledOpenTime = Math.floor(openTime / millis) * millis;
const resampledCloseTime = Math.ceil(openTime / millis) * millis - 1;
if (typeof this.kline[pairId][timeframe] === "undefined") {
this.kline[pairId][timeframe] = {};
}
if (typeof this.kline[pairId][timeframe][resampledOpenTime] === "undefined") {
const newKline = {
pairId: pairId,
base: base,
quote: quote,
interval: timeframe,
openTime: resampledOpenTime,
closeTime: resampledCloseTime,
timestamp: timestamp,
open: open,
high: high,
low: low,
close: close,
baseVolume: baseVolume,
quoteVolume: quoteVolume,
isFinal: false,
raw: null
}
this.kline[pairId][timeframe][resampledOpenTime] = {
kline: newKline,
prevBaseVol: 0,
prevQuoteVol: 0
}
}
else {
const { kline, prevBaseVol, prevQuoteVol } = this.kline[pairId][timeframe][resampledOpenTime];
kline.high = high > kline.high ? high : kline.high;
kline.low = low < kline.low ? low : kline.low;
kline.close = close;
kline.baseVolume = parseFloat((prevBaseVol + baseVolume).toFixed(PRECISION));
kline.quoteVolume = parseFloat((prevQuoteVol + quoteVolume).toFixed(PRECISION));
}
if (isFinal) {
const { prevBaseVol, prevQuoteVol } = this.kline[pairId][timeframe][resampledOpenTime];
this.kline[pairId][timeframe][resampledOpenTime].prevBaseVol = parseFloat((prevBaseVol + baseVolume).toFixed(PRECISION));
this.kline[pairId][timeframe][resampledOpenTime].prevQuoteVol = parseFloat((prevQuoteVol + quoteVolume).toFixed(PRECISION));
}
let keys = Object.keys(this.kline[pairId][timeframe]);
if (keys.length > 1) {
let previousTimestamp = keys.shift();
this.kline[pairId][timeframe][previousTimestamp]['kline'].isFinal = true;
this.push(this.kline[pairId][timeframe][previousTimestamp].kline);
delete this.kline[pairId][timeframe][previousTimestamp];
console.log(Object.keys(this.kline[pairId][timeframe]).length, Object.keys(this.kline[pairId]).length)
}
this.push(this.kline[pairId][timeframe][resampledOpenTime].kline);
}
callback()
}
}
Я новичок в потоках, и некоторые идеи будут очень признательны, если я создам несколько экземпляров TransformStream, по одному на каждый таймфрейм