WebFlux: как работать с методом takeUntilOther()?

Может кто-нибудь объяснить, как takeUntilOther()метод работает? Я попытался запустить следующий код, но на моей консоли он ничего не показывает.

     Mono.just(10)
                .subscribe();

        Flux.range(1, 5)
                .takeUntilOther(Mono.just(10))
                .subscribe(System.out::println);

Не понимаю почему.

2 ответа

Решение

Кирилл,

Предлагаю вам обратиться к соответствующей части документации на проектный реактор.

takeUntilOther(Publisher<?> other) Релейные значения из этого потока до тех пор, пока данный издатель не излучает.

Это означает, что вы будете получать значения из исходного потока, пока не будут Publisher<?> otherначинает производить события. В вашем случае у вас есть популярный издатель just() который немедленно прерывает исходный поток (вызывая cancel() метод).

Приведу еще один пример. Взгляните на следующий фрагмент кода:

Flux.range(1, 5) // produces elements from 1 to 5
        .delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
        .takeUntilOther(Mono
                .just(10) // hot publisher. emits one element

                // delays '10' for 3 seconds. meaning that it will only 
                // appears in the original Flux in 3 seconds
                .delayElement(Duration.ofSeconds(3)) 
        )
        .subscribe(System.out::print);

Результат:

12

Добавьте Thread.sleep для ожидания основного потока (или любого потока, выполняющегося в текущем коде), чтобы поток подписчика продолжил процесс. Вот тестовая версия.

          @Test
    public void flux_Skip_Take_Based_On_Other_Streams() throws InterruptedException {
        
        Flux.range(1, 100) // publisher with elements from 1-100
                .delayElements(Duration.ofSeconds(1)) // Flux delay 1 sec before each element emit
                .skipUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(10))) // skip the elements until inner Mono emits, i.e.  for 10 seconds
                .takeUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(70))) // take the elements until inner mono emits, i.e. till 70 seconds
                .log()
                .subscribe();
    
        Thread.sleep(1000*100); //Sleep the main thread for 100 sec or more to verify the logs
}

выход:

      2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : onSubscribe(SerializedSubscriber)
2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : request(unbounded)
2022-05-27 17:45:43.469  INFO 4180 --- [    parallel-12] reactor.Flux.TakeUntilOther.1            : onNext(10)
2022-05-27 17:45:44.485  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onNext(11)
.
.
.
2022-05-27 17:46:42.098  INFO 4180 --- [    parallel-10] reactor.Flux.TakeUntilOther.1            : onNext(68)
2022-05-27 17:46:43.103  INFO 4180 --- [    parallel-11] reactor.Flux.TakeUntilOther.1            : onNext(69)
2022-05-27 17:46:43.306  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onComplete()
Другие вопросы по тегам