Почему Абонент запрашивает разное количество элементов в разных случаях?

Я изучаю реактивные потоки и утилиту публикации-подписки, и я использую поведение по умолчанию Publisher(Flux в моем случае) и Subscriber.

У меня есть два сценария, оба имеют одинаковое количество элементов в Flux. Но когда я анализирую логи, метод onSubscribe запрашивает разное количество элементов (скажем, в одном случае это запрос на неограниченные элементы, а в другом случае он запрашивает 32 элемента).

Вот два случая и журналы:

        System.out.println("*********Calling MapData************");
        List<Integer> elements = new ArrayList<>();
        Flux.just(1, 2, 3, 4)
          .log()
          .map(i -> i * 2)
          .subscribe(elements::add);
        //printElements(elements);
        System.out.println("-------------------------------------");

        System.out.println("Inside Combine Streams");
        List<Integer> elems = new ArrayList<>();
        Flux.just(10,20,30,40)
            .log()
            .map(x -> x * 2)
            .zipWith(Flux.range(0, Integer.MAX_VALUE),
                (two, one) -> String.format("First  : %d, Second : %d \n", one, two))
            .subscribe(new Consumer<String>() {
              @Override
              public void accept(String s) {

              }
            });
        System.out.println("-------------------------------------");

и вот журналы:

*********Calling MapData************
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(unbounded)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Inside Combine Streams
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(32)
[info] | onNext(10)
[info] | onNext(20)
[info] | onNext(30)
[info] | onNext(40)
[info] | onComplete()
[info] | cancel()
-------------------------------------

Так как я не использовал никакой настраиваемой реализации подписчика, то почему в случае "MapData" это ведение журнала "[info] | request(unbounded)", а в случае "Inside Combine Streams" это ведение журнала "[info] | request(32)

Пожалуйста, предложите.

1 ответ

Решение

Во-первых, вы должны знать, что это ожидаемое поведение.

В зависимости от операторов, которые вы используете, Reactor будет применять разные стратегии предварительной выборки:

  • некоторые операторы будут использовать значения по умолчанию, такие как 32 или же 256
  • некоторые устройства будут использовать предоставленное вами значение, если вы добавили оператор буферизации с определенным значением
  • Reactor может догадаться, что поток значений конечен, и будет запрашивать неограниченное значение

Вы всегда можете изменить это поведение, если вы используете варианты операторов с int prefetch аргумент метода, или если вы реализуете свой собственный Subscriber с помощью BaseSubscriber (который предоставляет несколько полезных методов для этого).

Суть в том, что вам часто не нужно обращать внимание на это конкретное значение; это может быть полезно, только если вы хотите оптимизировать эту стратегию предварительной выборки для конкретного источника данных.

Другие вопросы по тегам