Условное излучение реактора Flux

Можно ли разрешить выдачу значений из потока условно на основе глобальной логической переменной? Я работаю с Flux delayUntil(...), но не могу полностью понять его функциональность или мои предположения неверны.

У меня есть глобальный AtomicBoolean, который представляет доступность нисходящего соединения и хочу, чтобы восходящий поток испускал только если нисходящий поток готов к обработке.

Для представления сценария создан (не рабочий) тестовый образец

//Randomly generates a boolean value every 5 seconds
private Flux<Boolean> signalGenerator() {
    return Flux.range(1, Integer.MAX_VALUE)
            .delayElements(Duration.ofMillis(5000))
            .map(integer -> new Random().nextBoolean());
}

а также

   Flux.range(1, Integer.MAX_VALUE)
            .delayElements(Duration.ofMillis(1000))
            .delayUntil(evt -> signalGenerator())     // ??  Only proceed when signalGenerator returns true
            .subscribe(System.out::println);

У меня есть другой сценарий, в котором процесс ниже по потоку может принимать только x сообщений в секунду. В текущей нереактивной реализации у нас есть семафор из x разрешений, и поток блокируется, если больше нет разрешений, с семафором разрешений, сбрасываемых каждую секунду.

В обоих сценариях я хочу, чтобы поток исходящего потока генерировался только тогда, когда есть запрос от последующего процесса, и я не хочу буферизовать.

1 ответ

Вы можете подумать об использовании Mono.fromRunnable() как вход в delayUntil() как показано ниже;

Класс помощника;

      public class FluxCondition {

    CountDownLatch latch = new CountDownLatch(10); // it depends, might be managed somehow
    Runnable r = () -> { latch.await(); }

    public void lock() { Mono.fromRunnable(r) };
    public void release() { latch.countDown(); }
}
    

Использование;

      FluxCondition delayCondition = new FluxCondition();
Flux.range(1, 10).delayUntil(o -> delayCondition.lock()).subscribe();
.....
delayCondition.release(); // shall call this for each element

Я предполагаю, что может быть лучшее решение, используяink.emitNext, но для этого также может потребоваться переменная условия для управления потоком Flux.

Насколько я понимаю, в реактивном программировании ваши данные должны учитываться на каждом шаге оператора. Так что, возможно, вам будет лучше спроектировать своего потребителя как реактивный процессор. В моем случае у меня не было шансов и я пошел по тому пути, который я описал выше.

Другие вопросы по тегам