Bacon.js контролирует буферизацию потока другим потоком
Я хочу буферизовать значения EventStream в Bacon.js точно так же, как buffer(closingSelector)
ведет себя в RxJava. Когда "поток контроллера" (closeSelector в методе RxJava) выдает новое значение, тогда буфер событий сбрасывается.
Поэтому я хочу, чтобы вывод потока был похож на stream.bufferWithTimeOrCount
, но вместо того, чтобы управлять буферизацией с временным интервалом или числом событий, я хочу управлять буферизацией с другим потоком.
Есть ли простой способ реализовать это в Bacon.js?
3 ответа
В Bacon.js не было нужной вам функции, поэтому я посмотрел исходный код bacon.js и написал модифицированную версию holdWhen
,
Bacon.EventStream.prototype.bufferUntilValue = function(valve) {
var valve_ = valve.startWith(false);
return this.filter(false).merge(valve_.flatMapConcat((function(_this) {
return function() {
return _this.scan([], (function(xs, x) {
return xs.concat(x);
}), {
eager: true
}).sampledBy(valve).take(1);
};
})(this)));
};
Чтобы увидеть это в действии, проверьте этот jsFiddle.
Bacon.holdWhen
доступно, так как около 0.7.14 делает почти то, что вы хотите, хотя буферизованные события генерируются одно за другим:
stream.holdWhen (valve) приостанавливает и буферизует поток событий, если последнее событие в вентиле верно. Все буферизованные события освобождаются, когда клапан становится ложным.
Если вам нужно выдавать буферизованные события как одно событие, вы можете попробовать что-то вроде следующего:
// source streams
var sourceObservable = Bacon.interval(1000);
var closingSelector = new Bacon.Bus();
// Constructing a new Observable where we're going to keep our state.
//
// We need to keep track of two things:
// - the buffer that is currently being filled, and
// - a previous buffer that is being flushed.
// The state will then look like this:
// [ buffer, flushed]
// where both buffer and flushed is an array of events from the source observable.
// empty initial state
var initialState = {buffer: [], flushed: []}
// There are two operations on the state: appending a new element to the buffer
// and flushing the current buffer:
// append each event from the source observable to the buffer,
// keeping flushed unchanged
var appends = sourceObservable.map(function(e) {
return function(state) {
state.buffer.push(e); return state;
}
});
// each event from the closingSelector replaces the `flushed` with
// the `buffer`'s contents, inserting an empty buffer.
var flushes = closingSelector.map(function(_) {
return function(state) { return {buffer: [], flushed: state.buffer} }
})
// merge appends and flushes into a single stream and apply them to the initial state
var ops = appends.merge(flushes)
var state = ops.scan(initialState, function(acc, f) { return f(acc) });
// resulting stream of flushed events
var flushed = state.sampledBy(closingSelector).map(function(state) { return state.flushed })
// triggered with `closingSelector.push({})`
flushed.onValue(function(x) { console.log("flushed", x) })
stream.holdWhen(valve)
выглядит почти точно, что вы хотите. Это работает немного по-другому, чем buffer(closingSelector)
: вместо того, чтобы все время буферизовать и очищать буфер при событии из closingSelector
, переключает буферизацию в зависимости от последнего значения в value
поток.
Может быть, вы могли бы использовать holdWhen
как есть, но если вы хотите поведение, как в buffer(closingSelector)
, вы можете сделать что-то вроде этого:
var result = sourceStream.holdWhen(closingSelector.flatMap(function(){
return Bacon.fromArray([false, true]);
}).toProperty(true));
На каждое событие из closingSelector
мы генерируем два события в value
поток со значениями true
а также false
то есть отключение буферизации (которое вызывает сброс), а затем немедленно снова включить его.