Как создать несколько потоков из одного потока в узле?

У меня есть данные 1 мкл, поступающие от внешнего API WebSocket, и я пытаюсь сгенерировать из них потоки 5 м 15 м 30 м 1 ч 1 д и 1 Вт. Мой текущий TransformStream может конвертировать 1 м на один из вышеупомянутых таймфреймов

Как мне это сделать

one.pipe(five)
one.pipe(fifteen)
one.pipe(thirty)

без создания нескольких экземпляров исходного потока "один". В настоящее время мой единственный экземпляр TransformStream испускает исходные данные плюс все необходимые временные рамки.

class TimeFrameGenerator extends Transform {
    constructor(timeframes) {
        super({ readableObjectMode: true, writableObjectMode: true });
        this.kline = {}

        //Consider using an object for this, looping with a for in is much faster for this case than using an array or accessing object keys
        this.timeframes = {
            "1m": 1000 * 60,
            "5m": 1000 * 60 * 5,
            "15m": 1000 * 60 * 15,
            "30m": 1000 * 60 * 30,
            "1h": 1000 * 60 * 60,
            "1d": 1000 * 60 * 60 * 24
        }
    }

    _transform(rawKline, encoding, callback) {

        this.push(rawKline);

        const { pairId, base, quote, interval, openTime, closeTime, timestamp, open, high, low, close, baseVolume, quoteVolume, isFinal, raw } = rawKline;

        if (typeof this.kline[pairId] === "undefined")
            this.kline[pairId] = {};

        for (let timeframe in this.timeframes) {
            if (timeframe === interval)
                continue;

            const millis = this.timeframes[timeframe];

            const resampledOpenTime = Math.floor(openTime / millis) * millis;
            const resampledCloseTime = Math.ceil(openTime / millis) * millis - 1;

            if (typeof this.kline[pairId][timeframe] === "undefined") {
                this.kline[pairId][timeframe] = {};
            }

            if (typeof this.kline[pairId][timeframe][resampledOpenTime] === "undefined") {
                const newKline = {
                    pairId: pairId,
                    base: base,
                    quote: quote,
                    interval: timeframe,
                    openTime: resampledOpenTime,
                    closeTime: resampledCloseTime,
                    timestamp: timestamp,
                    open: open,
                    high: high,
                    low: low,
                    close: close,
                    baseVolume: baseVolume,
                    quoteVolume: quoteVolume,
                    isFinal: false,
                    raw: null
                }

                this.kline[pairId][timeframe][resampledOpenTime] = {
                    kline: newKline,
                    prevBaseVol: 0,
                    prevQuoteVol: 0
                }
            }
            else {
                const { kline, prevBaseVol, prevQuoteVol } = this.kline[pairId][timeframe][resampledOpenTime];
                kline.high = high > kline.high ? high : kline.high;
                kline.low = low < kline.low ? low : kline.low;
                kline.close = close;
                kline.baseVolume = parseFloat((prevBaseVol + baseVolume).toFixed(PRECISION));
                kline.quoteVolume = parseFloat((prevQuoteVol + quoteVolume).toFixed(PRECISION));
            }

            if (isFinal) {
                const { prevBaseVol, prevQuoteVol } = this.kline[pairId][timeframe][resampledOpenTime];
                this.kline[pairId][timeframe][resampledOpenTime].prevBaseVol = parseFloat((prevBaseVol + baseVolume).toFixed(PRECISION));
                this.kline[pairId][timeframe][resampledOpenTime].prevQuoteVol = parseFloat((prevQuoteVol + quoteVolume).toFixed(PRECISION));
            }

            let keys = Object.keys(this.kline[pairId][timeframe]);

            if (keys.length > 1) {
                let previousTimestamp = keys.shift();
                this.kline[pairId][timeframe][previousTimestamp]['kline'].isFinal = true;
                this.push(this.kline[pairId][timeframe][previousTimestamp].kline);
                delete this.kline[pairId][timeframe][previousTimestamp];
                console.log(Object.keys(this.kline[pairId][timeframe]).length, Object.keys(this.kline[pairId]).length)
            }

            this.push(this.kline[pairId][timeframe][resampledOpenTime].kline);
        }

        callback()
    }
}

Я новичок в потоках, и некоторые идеи будут очень признательны, если я создам несколько экземпляров TransformStream, по одному на каждый таймфрейм

0 ответов

Другие вопросы по тегам