Почему Абонент запрашивает разное количество элементов в разных случаях?
Я изучаю реактивные потоки и утилиту публикации-подписки, и я использую поведение по умолчанию Publisher(Flux в моем случае) и Subscriber.
У меня есть два сценария, оба имеют одинаковое количество элементов в Flux. Но когда я анализирую логи, метод onSubscribe запрашивает разное количество элементов (скажем, в одном случае это запрос на неограниченные элементы, а в другом случае он запрашивает 32 элемента).
Вот два случая и журналы:
System.out.println("*********Calling MapData************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
//printElements(elements);
System.out.println("-------------------------------------");
System.out.println("Inside Combine Streams");
List<Integer> elems = new ArrayList<>();
Flux.just(10,20,30,40)
.log()
.map(x -> x * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(two, one) -> String.format("First : %d, Second : %d \n", one, two))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
}
});
System.out.println("-------------------------------------");
и вот журналы:
*********Calling MapData************
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(unbounded)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Inside Combine Streams
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(32)
[info] | onNext(10)
[info] | onNext(20)
[info] | onNext(30)
[info] | onNext(40)
[info] | onComplete()
[info] | cancel()
-------------------------------------
Так как я не использовал никакой настраиваемой реализации подписчика, то почему в случае "MapData" это ведение журнала "[info] | request(unbounded)", а в случае "Inside Combine Streams" это ведение журнала "[info] | request(32)
Пожалуйста, предложите.
1 ответ
Во-первых, вы должны знать, что это ожидаемое поведение.
В зависимости от операторов, которые вы используете, Reactor будет применять разные стратегии предварительной выборки:
- некоторые операторы будут использовать значения по умолчанию, такие как
32
или же256
- некоторые устройства будут использовать предоставленное вами значение, если вы добавили оператор буферизации с определенным значением
- Reactor может догадаться, что поток значений конечен, и будет запрашивать неограниченное значение
Вы всегда можете изменить это поведение, если вы используете варианты операторов с int prefetch
аргумент метода, или если вы реализуете свой собственный Subscriber
с помощью BaseSubscriber
(который предоставляет несколько полезных методов для этого).
Суть в том, что вам часто не нужно обращать внимание на это конкретное значение; это может быть полезно, только если вы хотите оптимизировать эту стратегию предварительной выборки для конкретного источника данных.