как создать множественный поток из исходного потока на основе некоторого фильтра в nodejs

Я хочу создать несколько потоков из одного потока на основе фильтров и хочу прослушивать эти потоки в каком-то другом модуле, который когда-либо его слушает. В основном я экспортирую поток, но я не могу слушать его в другом классе, который мне в основном нужен

У меня есть код ниже, который я сделал:

    const product_ids = require('../config').CURRENCY_PAIR
  // reading product ids in the form of an array
    const stream = require('stream');
    class UnifiedStream extends stream.Transform { // this one is the source stream that I want to transform into multiple streams
        constructor() {
            super({
                objectMode: true,
                highWaterMark: 1
            });

        }
    _transform(chunk, encoding, callback) {

        // console.log(chunk)

        this.push(chunk);
        callback()

    }
}

class FilterProducts extends stream.Transform { // here I am transforming  unified stream

    constructor(product_id) {

        super({
            readableObjectMode: true,
            writableObjectMode: true
        });
        this.product_id = product_id
    }
    _transform(chunk, encoding, callback) { //product id can be any from config file or variable pass
        console.log(this.product_id)
        if (chunk.product_id && chunk.product_id == this.product_id) {
            this.push(chunk);
            callback(null, callback)
        }
    }
}



let unified_stream = new UnifiedStream();

// here we are attching the stream of objects with the 
product_ids.forEach(product_id => {

    let filter_product = new FilterProducts(product_id);
    unified_stream.pipe(filter_products)
    product_streams.push( 
     //I am pushing filter data into array
        filter_product
    )


})
module.exports.unified_stream = unified_stream
module.exports.product_streams = product_streams

// here I am subscribing in another class but 

let streams = require('./streams/streams')


connections.on('message', data => {
    streams.unified_stream.write(data); // here I can read data
})
 // but I am unable to listen this one
streams.product_streams[0].on("data", (data) => {
    console.log(data) 
// here error is throwing that on event is not available
})

0 ответов

Другие вопросы по тегам