Узловые потоки - Нажатие на чтение непреднамеренно разделяет на 3 потока записи
Цель: объекты будут помещены в читаемый поток, а затем сохранены в отдельный файл.csv в зависимости от того, с какого канала (электронная почта, push, In-App) они поступают.
Проблема: я не могу разделить потоки на разные "строки".pipe(), поэтому все журналы.csv получают только объекты событий, относящиеся к их каналам. Но в текущей итерации все CSV-файлы, созданные Writestream, получают объекты событий со всех каналов.
Вопросы: Могу ли я динамически создать многоканальные "линии конвейера" в функции setup () программным способом или это правильный подход?
Является ли это ручное создание "pipe() lines" причиной того, что все CSV-файлы заполняются событиями? Можно ли это решить с помощью одной "pipe() line" и динамической маршрутизации?
Краткое объяснение кода ниже:
setup () вызывает makeStreams () - создает объект с доступными для чтения и записи (вращающийся поток файловой системы с возможностью записи) (setup() сейчас является ненужной функцией, но позже будет выполнять больше задач по настройке.)
pushStream () вызывается, когда происходит входящее событие, и толкает объект, например: {Email: {queryParam:1, queryParam:2 и т. д.}} Событие сортируется по объекту самого высокого уровня (в данном случае "Email") и затем передается в правильный поток записи, который теоретически должен быть перенесен в правильный поток записи. К сожалению, это не тот случай, он отправляет объект события во все доступные для записи потоки. Как я могу отправить его только в правильный поток?
КОД:
const Readable = require('stream').Readable
const Json2csvTransform = require('json2csv').Transform;
var rfs = require("rotating-file-stream");
const channelTypes = ['Push Notification', 'Email', 'In-app Message']
var streamArr = setup(channelTypes);
const opts = {};
const transformOpts = {
objectMode: true
};
const json2csv = new Json2csvTransform(opts, transformOpts);
function setup(list) {
console.log("Setting up streams...")
streamArr = makeStreams(list) //makes streams out of each endpoint
return streamArr
}
//Stream Builder for Logging Based Upon Channel Name
function makeStreams(listArray) {
listArray = ['Push Notification', 'Email', 'In-app Message']
var length = listArray.length
var streamObjs = {}
for (var name = 0; name < length; name++) {
var fileName = listArray[name] + '.csv'
const readStream = new Readable({
objectMode: true,
read() {}
})
const writeStream = rfs(fileName, {
size: "50M", // rotate every 50 MegaBytes written
interval: "1d" // rotate daily
//compress: "gzip" // compress rotated files
});
var objName = listArray[name]
var obj = {
instream: readStream,
outstream: writeStream
}
streamObjs[objName] = obj
}
return streamObjs
}
function pushStream(obj) {
var keys = Object.keys(obj)
if (streamArr[keys]) {
streamArr[keys].instream.push(obj[keys])
} else {
console.log("event without a matching channel error")
}
}
//Had to make each pipe line here manually. Can this be improved? Is it the reason all of the files are receiving all events?
streamArr['Email'].instream.pipe(json2csv).pipe(streamArr['Email'].outstream)
streamArr['In-app Message'].instream.pipe(json2csv).pipe(streamArr['In-app Message'].outstream)
streamArr['Push Notification'].instream.pipe(json2csv).pipe(streamArr['Push Notification'].outstream)
module.exports = {
makeStreams,
pushStream,
setup
}