как создать множественный поток из исходного потока на основе некоторого фильтра в nodejs

Я хочу создать несколько потоков из одного потока на основе фильтров и хочу прослушивать эти потоки в каком-то другом модуле, который когда-либо его слушает. В основном я экспортирую поток, но я не могу слушать его в другом классе, который мне в основном нужен

У меня есть код ниже, который я сделал:

    const product_ids = require('../config').CURRENCY_PAIR
  // reading product ids in the form of an array
    const stream = require('stream');
    class UnifiedStream extends stream.Transform { // this one is the source stream that I want to transform into multiple streams
        constructor() {
                objectMode: true,
                highWaterMark: 1

    _transform(chunk, encoding, callback) {

        // console.log(chunk)



class FilterProducts extends stream.Transform { // here I am transforming  unified stream

    constructor(product_id) {

            readableObjectMode: true,
            writableObjectMode: true
        this.product_id = product_id
    _transform(chunk, encoding, callback) { //product id can be any from config file or variable pass
        if (chunk.product_id && chunk.product_id == this.product_id) {
            callback(null, callback)

let unified_stream = new UnifiedStream();

// here we are attching the stream of objects with the 
product_ids.forEach(product_id => {

    let filter_product = new FilterProducts(product_id);
     //I am pushing filter data into array

module.exports.unified_stream = unified_stream
module.exports.product_streams = product_streams

// here I am subscribing in another class but 

let streams = require('./streams/streams')

connections.on('message', data => {
    streams.unified_stream.write(data); // here I can read data
 // but I am unable to listen this one
streams.product_streams[0].on("data", (data) => {
// here error is throwing that on event is not available

0 ответов

Другие вопросы по тегам