Управление зависимостями в node.js с помощью highland.js

Я получаю огромное значение от node.js и обожаю модель потоковой обработки. Я в основном использую его для потоковой обработки с обогащением данных и ETL-подобных заданий.

Для обогащения у меня может быть такая запись...

{ "ip":"123.45.789.01", "productId": 12345 }

Я хотел бы обогатить это, возможно, добавив информацию о продукте

{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }

Данные для описаний и данные для цен поступают из отдельных потоков. Как лучше всего подойти к таким зависимостям в горной местности?

H = require('highland')

descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
  .flatMap(JSON.parse)

priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
  .flatMap(JSON.parse)

#  the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
  .splitBy("\n")
  .take(100000) # just take 100k for testing
  .filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
  .doto((v)->
    # here i want to add the decription from the descriptionStream
    # and i want to add the price from the price stream.
    # in order to do that, i need to make the execution of this
    # stream dependent on the completion of the first two and
    # availability of that data.  this is easy with declarative
    # programming but less intuitive with functional programming
  )
  .toArray((results)->
    # dump my results here
  )

Какие-нибудь мысли?

2 ответа

Если вы используете highland.js, вы можете использовать .map и предоставить функцию для изменения каждого элемента.

например

var stream = _([{ "ip":"123.45.789.01", "productId": 12345 }]).map(function (x) {
   x.productName = 'Coca-Cola 12 Pack'
   return x;
});

Вот удар в этом. Это правильный подход?

H = require('highland')

# these values would come from some api/file
descriptionStream = H([{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}])
  .reduce({}, (memo,v)->
    memo[v.productId] = v;
    return memo
  )

# these values would come from some api/file
priceStream = H([{"productId":1,"price":4.00},{"productId":2,"price":1.25}])
  .reduce({}, (memo,v)->
    memo[v.productId] = v;
    return memo
  )

H([descriptionStream, priceStream])
  .series()
  .toArray((dependencies)->
    [descriptionIndex, priceIndex] = dependencies

    # these values would come from an api/file
    H([{productId:1},{productId:2}])
      .doto((v)-> v.description = descriptionIndex[v.productId].description)
      .doto((v)-> v.price = priceIndex[v.productId].price)
      .each((v)->
        console.log(JSON.stringify(v))
      )
  )

Это дает мне правильные результаты, но я не уверен, что это элегантный способ сделать потоковую зависимость. Я также предполагаю, что если вам нужен поток цен или описания более одного раза, то вы их разветвите.

{"productId":1,"description":"Coca-Cola 12Pk","price":4}
{"productId":2,"description":"Coca-Cola 20oz Bottle","price":1.25}
Другие вопросы по тегам