Bacon.js контролирует буферизацию потока другим потоком

Я хочу буферизовать значения EventStream в Bacon.js точно так же, как buffer(closingSelector) ведет себя в RxJava. Когда "поток контроллера" (closeSelector в методе RxJava) выдает новое значение, тогда буфер событий сбрасывается.

Поэтому я хочу, чтобы вывод потока был похож на stream.bufferWithTimeOrCount, но вместо того, чтобы управлять буферизацией с временным интервалом или числом событий, я хочу управлять буферизацией с другим потоком.

Есть ли простой способ реализовать это в Bacon.js?

3 ответа

Решение

В Bacon.js не было нужной вам функции, поэтому я посмотрел исходный код bacon.js и написал модифицированную версию holdWhen,

Bacon.EventStream.prototype.bufferUntilValue = function(valve) {
var valve_ = valve.startWith(false);

  return this.filter(false).merge(valve_.flatMapConcat((function(_this) {
    return function() {
        return _this.scan([], (function(xs, x) {
            return xs.concat(x);
        }), {
            eager: true
        }).sampledBy(valve).take(1);
    };
  })(this)));
};

Чтобы увидеть это в действии, проверьте этот jsFiddle.

Bacon.holdWhen доступно, так как около 0.7.14 делает почти то, что вы хотите, хотя буферизованные события генерируются одно за другим:

stream.holdWhen (valve) приостанавливает и буферизует поток событий, если последнее событие в вентиле верно. Все буферизованные события освобождаются, когда клапан становится ложным.

Если вам нужно выдавать буферизованные события как одно событие, вы можете попробовать что-то вроде следующего:

// source streams
var sourceObservable = Bacon.interval(1000);
var closingSelector = new Bacon.Bus();

// Constructing a new Observable where we're going to keep our state.
// 
// We need to keep track of two things: 
//   - the buffer that is currently being filled, and
//   -  a previous buffer that is being flushed.
// The state will then look like this:
//   [ buffer, flushed]
// where both buffer and flushed is an array of events from the source observable.

// empty initial state
var initialState = {buffer: [], flushed: []}

// There are two operations on the state: appending a new element to the buffer 
// and flushing the current buffer:

// append each event from the source observable to the buffer,
// keeping flushed unchanged
var appends = sourceObservable.map(function(e) {
   return function(state) {
       state.buffer.push(e); return state; 
   } 
});

// each event from the closingSelector replaces the `flushed` with 
// the `buffer`'s contents, inserting an empty buffer.
var flushes = closingSelector.map(function(_) {
   return function(state) { return {buffer: [], flushed: state.buffer} }
})

// merge appends and flushes into a single stream and apply them to the initial state
var ops = appends.merge(flushes)
var state = ops.scan(initialState, function(acc, f) { return f(acc) });

// resulting stream of flushed events
var flushed = state.sampledBy(closingSelector).map(function(state) { return state.flushed })

// triggered with `closingSelector.push({})`
flushed.onValue(function(x) { console.log("flushed", x) })

stream.holdWhen(valve) выглядит почти точно, что вы хотите. Это работает немного по-другому, чем buffer(closingSelector): вместо того, чтобы все время буферизовать и очищать буфер при событии из closingSelector, переключает буферизацию в зависимости от последнего значения в value поток.

Может быть, вы могли бы использовать holdWhen как есть, но если вы хотите поведение, как в buffer(closingSelector), вы можете сделать что-то вроде этого:

var result = sourceStream.holdWhen(closingSelector.flatMap(function(){
  return Bacon.fromArray([false, true]);
}).toProperty(true));

На каждое событие из closingSelector мы генерируем два события в value поток со значениями true а также false то есть отключение буферизации (которое вызывает сброс), а затем немедленно снова включить его.

Другие вопросы по тегам