Условное излучение реактора Flux
Можно ли разрешить выдачу значений из потока условно на основе глобальной логической переменной? Я работаю с Flux delayUntil(...), но не могу полностью понять его функциональность или мои предположения неверны.
У меня есть глобальный AtomicBoolean, который представляет доступность нисходящего соединения и хочу, чтобы восходящий поток испускал только если нисходящий поток готов к обработке.
Для представления сценария создан (не рабочий) тестовый образец
//Randomly generates a boolean value every 5 seconds
private Flux<Boolean> signalGenerator() {
return Flux.range(1, Integer.MAX_VALUE)
.delayElements(Duration.ofMillis(5000))
.map(integer -> new Random().nextBoolean());
}
а также
Flux.range(1, Integer.MAX_VALUE)
.delayElements(Duration.ofMillis(1000))
.delayUntil(evt -> signalGenerator()) // ?? Only proceed when signalGenerator returns true
.subscribe(System.out::println);
У меня есть другой сценарий, в котором процесс ниже по потоку может принимать только x сообщений в секунду. В текущей нереактивной реализации у нас есть семафор из x разрешений, и поток блокируется, если больше нет разрешений, с семафором разрешений, сбрасываемых каждую секунду.
В обоих сценариях я хочу, чтобы поток исходящего потока генерировался только тогда, когда есть запрос от последующего процесса, и я не хочу буферизовать.
1 ответ
Вы можете подумать об использовании
Mono.fromRunnable()
как вход в
delayUntil()
как показано ниже;
Класс помощника;
public class FluxCondition {
CountDownLatch latch = new CountDownLatch(10); // it depends, might be managed somehow
Runnable r = () -> { latch.await(); }
public void lock() { Mono.fromRunnable(r) };
public void release() { latch.countDown(); }
}
Использование;
FluxCondition delayCondition = new FluxCondition();
Flux.range(1, 10).delayUntil(o -> delayCondition.lock()).subscribe();
.....
delayCondition.release(); // shall call this for each element
Я предполагаю, что может быть лучшее решение, используяink.emitNext, но для этого также может потребоваться переменная условия для управления потоком Flux.
Насколько я понимаю, в реактивном программировании ваши данные должны учитываться на каждом шаге оператора. Так что, возможно, вам будет лучше спроектировать своего потребителя как реактивный процессор. В моем случае у меня не было шансов и я пошел по тому пути, который я описал выше.