WebFlux: как работать с методом takeUntilOther()?
Может кто-нибудь объяснить, как takeUntilOther()
метод работает? Я попытался запустить следующий код, но на моей консоли он ничего не показывает.
Mono.just(10)
.subscribe();
Flux.range(1, 5)
.takeUntilOther(Mono.just(10))
.subscribe(System.out::println);
Не понимаю почему.
2 ответа
Кирилл,
Предлагаю вам обратиться к соответствующей части документации на проектный реактор.
takeUntilOther(Publisher<?> other)
Релейные значения из этого потока до тех пор, пока данный издатель не излучает.
Это означает, что вы будете получать значения из исходного потока, пока не будут Publisher<?> other
начинает производить события. В вашем случае у вас есть популярный издатель just()
который немедленно прерывает исходный поток (вызывая cancel()
метод).
Приведу еще один пример. Взгляните на следующий фрагмент кода:
Flux.range(1, 5) // produces elements from 1 to 5
.delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
.takeUntilOther(Mono
.just(10) // hot publisher. emits one element
// delays '10' for 3 seconds. meaning that it will only
// appears in the original Flux in 3 seconds
.delayElement(Duration.ofSeconds(3))
)
.subscribe(System.out::print);
Результат:
12
Добавьте Thread.sleep для ожидания основного потока (или любого потока, выполняющегося в текущем коде), чтобы поток подписчика продолжил процесс. Вот тестовая версия.
@Test
public void flux_Skip_Take_Based_On_Other_Streams() throws InterruptedException {
Flux.range(1, 100) // publisher with elements from 1-100
.delayElements(Duration.ofSeconds(1)) // Flux delay 1 sec before each element emit
.skipUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(10))) // skip the elements until inner Mono emits, i.e. for 10 seconds
.takeUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(70))) // take the elements until inner mono emits, i.e. till 70 seconds
.log()
.subscribe();
Thread.sleep(1000*100); //Sleep the main thread for 100 sec or more to verify the logs
}
выход:
2022-05-27 17:45:33.317 INFO 4180 --- [ main] reactor.Flux.TakeUntilOther.1 : onSubscribe(SerializedSubscriber)
2022-05-27 17:45:33.317 INFO 4180 --- [ main] reactor.Flux.TakeUntilOther.1 : request(unbounded)
2022-05-27 17:45:43.469 INFO 4180 --- [ parallel-12] reactor.Flux.TakeUntilOther.1 : onNext(10)
2022-05-27 17:45:44.485 INFO 4180 --- [ parallel-1] reactor.Flux.TakeUntilOther.1 : onNext(11)
.
.
.
2022-05-27 17:46:42.098 INFO 4180 --- [ parallel-10] reactor.Flux.TakeUntilOther.1 : onNext(68)
2022-05-27 17:46:43.103 INFO 4180 --- [ parallel-11] reactor.Flux.TakeUntilOther.1 : onNext(69)
2022-05-27 17:46:43.306 INFO 4180 --- [ parallel-1] reactor.Flux.TakeUntilOther.1 : onComplete()