Highland добавить в поток, прежде чем полностью потребляется?

Есть способ добавить в поток высокогорья законченное сообщение с полным описанием наших потребностей?

Меньше предположим, что мы имеем следующую ситуацию:

const stream = high([1,4,6,7])

Затем с этим потоком я хочу подсчитать каждое из обрабатываемых значений и сказать

sink.drain(stream.pipe(4))

Будучи 4 число элементов массива. Учтите, что это могут быть тысячи объектов в потоке, и мне нужно потреблять из потока, чтобы можно было считать.

Я не могу сказать array.length, потому что это источник, который может содержать любую информацию, и эта информация обрабатывается потоком... Как я могу добавить в поток конец сообщения с описанием того, что было использовано?

1 ответ

Решение

Похоже, вы хотите установить какое-то состояние вокруг ваших значений, но не мешая потреблению ваших значений. Я хотел бы предложить посмотреть на некоторые решения, которые включают h.through,

Вы можете разделить поток с fork или же observe и уменьшить некоторое состояние из значений:

h([1, 2, 3, 4])
  .through(stream => {
    const length = stream.observe()
      .reduce(0, m => m + 1)
      .map(length => ({ length }))

    return h([stream, length])
      .sequence()
  })
  .errors(err => console.error(err))
  .each(x => console.log(x))

Вы можете создать некоторое состояние и вернуть новый поток на основе событий исходного потока:

const h = require('highland')
h([1, 2, 3, 4])
  .through(stream => {
    let length = 0

    return h(push => stream
      .tap(() => ++length)
      .errors(err => push(err))
      .each(x => push(null, x))
      .done(() => {
        push(null, `length: ${length}`)
        push(null, h.nil)
      }))
  })
  .errors(err => console.error(err))
  .each(x => console.log(x))
Другие вопросы по тегам