Upstream не отменяется, когда более одного оператора toProcessor()?
Когда я использовал оператор toProcessor() более одного раза, как в следующем фрагменте кода, восходящий Mono не будет отменен.
Вопросы:
- Может ли кто-нибудь объяснить, почему это происходит?
- Означает ли это, что использование toProcessor более одного раза неправильно?
Пример кода:
Mono<Integer> justOne = Mono.delay(Duration.ofMillis(10))
.then(Mono.just(1))
.log("justOne");
MonoProcessor<Integer> justOneHot = justOne
.toProcessor()
.log("justOneHot")
.toProcessor();
justOneHot.cancel();
justOneHot.block();
Вывод предыдущего кода:
[main] INFO justOne - | onSubscribe([Fuseable] MonoIgnoreThen.ThenIgnoreMain)
[main] INFO justOne - | request(unbounded)
[main] INFO justOneHot - onSubscribe([Fuseable] ReplayProcessor.ReplayInner)
[main] INFO justOneHot - request(unbounded)
[main] INFO justOneHot - cancel()