Управление зависимостями в node.js с помощью highland.js
Я получаю огромное значение от node.js и обожаю модель потоковой обработки. Я в основном использую его для потоковой обработки с обогащением данных и ETL-подобных заданий.
Для обогащения у меня может быть такая запись...
{ "ip":"123.45.789.01", "productId": 12345 }
Я хотел бы обогатить это, возможно, добавив информацию о продукте
{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }
Данные для описаний и данные для цен поступают из отдельных потоков. Как лучше всего подойти к таким зависимостям в горной местности?
H = require('highland')
descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
.flatMap(JSON.parse)
priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
.flatMap(JSON.parse)
# the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
.splitBy("\n")
.take(100000) # just take 100k for testing
.filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
.doto((v)->
# here i want to add the decription from the descriptionStream
# and i want to add the price from the price stream.
# in order to do that, i need to make the execution of this
# stream dependent on the completion of the first two and
# availability of that data. this is easy with declarative
# programming but less intuitive with functional programming
)
.toArray((results)->
# dump my results here
)
Какие-нибудь мысли?
2 ответа
Если вы используете highland.js, вы можете использовать .map
и предоставить функцию для изменения каждого элемента.
например
var stream = _([{ "ip":"123.45.789.01", "productId": 12345 }]).map(function (x) {
x.productName = 'Coca-Cola 12 Pack'
return x;
});
Вот удар в этом. Это правильный подход?
H = require('highland')
# these values would come from some api/file
descriptionStream = H([{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}])
.reduce({}, (memo,v)->
memo[v.productId] = v;
return memo
)
# these values would come from some api/file
priceStream = H([{"productId":1,"price":4.00},{"productId":2,"price":1.25}])
.reduce({}, (memo,v)->
memo[v.productId] = v;
return memo
)
H([descriptionStream, priceStream])
.series()
.toArray((dependencies)->
[descriptionIndex, priceIndex] = dependencies
# these values would come from an api/file
H([{productId:1},{productId:2}])
.doto((v)-> v.description = descriptionIndex[v.productId].description)
.doto((v)-> v.price = priceIndex[v.productId].price)
.each((v)->
console.log(JSON.stringify(v))
)
)
Это дает мне правильные результаты, но я не уверен, что это элегантный способ сделать потоковую зависимость. Я также предполагаю, что если вам нужен поток цен или описания более одного раза, то вы их разветвите.
{"productId":1,"description":"Coca-Cola 12Pk","price":4}
{"productId":2,"description":"Coca-Cola 20oz Bottle","price":1.25}