как создать множественный поток из исходного потока на основе некоторого фильтра в nodejs
Я хочу создать несколько потоков из одного потока на основе фильтров и хочу прослушивать эти потоки в каком-то другом модуле, который когда-либо его слушает. В основном я экспортирую поток, но я не могу слушать его в другом классе, который мне в основном нужен
У меня есть код ниже, который я сделал:
const product_ids = require('../config').CURRENCY_PAIR
// reading product ids in the form of an array
const stream = require('stream');
class UnifiedStream extends stream.Transform { // this one is the source stream that I want to transform into multiple streams
constructor() {
super({
objectMode: true,
highWaterMark: 1
});
}
_transform(chunk, encoding, callback) {
// console.log(chunk)
this.push(chunk);
callback()
}
}
class FilterProducts extends stream.Transform { // here I am transforming unified stream
constructor(product_id) {
super({
readableObjectMode: true,
writableObjectMode: true
});
this.product_id = product_id
}
_transform(chunk, encoding, callback) { //product id can be any from config file or variable pass
console.log(this.product_id)
if (chunk.product_id && chunk.product_id == this.product_id) {
this.push(chunk);
callback(null, callback)
}
}
}
let unified_stream = new UnifiedStream();
// here we are attching the stream of objects with the
product_ids.forEach(product_id => {
let filter_product = new FilterProducts(product_id);
unified_stream.pipe(filter_products)
product_streams.push(
//I am pushing filter data into array
filter_product
)
})
module.exports.unified_stream = unified_stream
module.exports.product_streams = product_streams
// here I am subscribing in another class but
let streams = require('./streams/streams')
connections.on('message', data => {
streams.unified_stream.write(data); // here I can read data
})
// but I am unable to listen this one
streams.product_streams[0].on("data", (data) => {
console.log(data)
// here error is throwing that on event is not available
})