Upstream не отменяется, когда более одного оператора toProcessor()?

Когда я использовал оператор toProcessor() более одного раза, как в следующем фрагменте кода, восходящий Mono не будет отменен.

Вопросы:

  1. Может ли кто-нибудь объяснить, почему это происходит?
  2. Означает ли это, что использование toProcessor более одного раза неправильно?

Пример кода:

    Mono<Integer> justOne = Mono.delay(Duration.ofMillis(10))
            .then(Mono.just(1))
            .log("justOne");

    MonoProcessor<Integer> justOneHot = justOne
            .toProcessor()
            .log("justOneHot")
            .toProcessor();

    justOneHot.cancel();
    justOneHot.block();

Вывод предыдущего кода:

    [main] INFO justOne - | onSubscribe([Fuseable] MonoIgnoreThen.ThenIgnoreMain)
    [main] INFO justOne - | request(unbounded)
    [main] INFO justOneHot - onSubscribe([Fuseable] ReplayProcessor.ReplayInner)
    [main] INFO justOneHot - request(unbounded)
    [main] INFO justOneHot - cancel()

0 ответов

Другие вопросы по тегам