Опубликовать против подписки в Project Reactor 3
Я использую publishOn против подписки на одном и том же потоке следующим образом:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
Хотя, когда я использую оба, ничего не печатается в журналах. Но когда я использую только publishOn, я получаю следующие информационные журналы:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Это publishOn более рекомендуется, чем subscribeOn? Или это имеет больше предпочтений, чем подписка? В чем разница между двумя и когда использовать какой?
4 ответа
Вот небольшая документация, которую я получил:
publishOn применяется так же, как и любой другой оператор, в середине цепочки подписчиков. Он принимает сигналы из нисходящего потока и воспроизводит их в обратном направлении, одновременно выполняя обратный вызов на рабочем месте из связанного планировщика. Следовательно, это влияет на то, где будут выполняться последующие операторы (пока не будет включен другой publishOn).
подписка применяется к процессу подписки, когда создается обратная цепочка. Как следствие, независимо от того, где вы размещаете подписку в цепочке, это всегда влияет на контекст исходной эмиссии. Однако это не влияет на поведение последующих вызовов publishOn. Они по-прежнему переключают контекст выполнения для части цепочки после них.
а также
publishOn заставляет следующий оператор (и, возможно, последующие операторы после следующего) запускаться в другом потоке. Точно так же, подписка заставляет предыдущий оператор (и, возможно, операторы, предшествующие предыдущему) запускаться в другом потоке.
Мне потребовалось время, чтобы понять это, может быть, потому что publishOn
обычно объясняется перед subscribeOn
, вот, надеюсь, более простое объяснение непрофессионала.
subscribeOn
означает запуск исходного излучения источника, например subscribe(), onSubscribe() and request()
для указанного рабочего планировщика (другого потока), а также то же самое для любых последующих операций, таких как, например, onNext/onError/onComplete, map etc
и независимо от положения subscribeOn() такое поведение будет
И если вы ничего не сделали publishOn
в беглых вызовах все, все будет работать в таком потоке.
Но как только ты позвонишь publishOn()
скажем посередине, тогда любой последующий вызов оператора будет выполняться на предоставленном работнике планировщика для такого publishOn()
.
вот пример
Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());
Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();
Результат будет
1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5
Как видите первый doOnNext()
и следующие map()
выполняется в потоке с именем subscribeOn_thread
, это происходит до тех пор, пока publishOn()
вызывается, то любой последующий вызов будет выполняться в поставленном планировщике для этого publishOn()
и снова это произойдет для любого последующего звонка, пока кто-нибудь не позвонит другому publishOn()
.
применяется в середине цепочки. Это влияет на последующие операторы послеpublishOn
- они будут выполняться в потоке, выбранном из планировщика публикации.
Flux.range(1, 2)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.blockLast();
Результат:
First map - (1), Thread: main
First map - (2), Thread: main
Second map - (1), Thread: scheduler-a-1
Second map - (2), Thread: scheduler-a-1
Если вы разместитеsubscribeOn
в цепочке, это влияет на выбросы источника во всей цепочке
Flux.range(1, 2)
.map(i -> {
System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
return i;
})
.blockLast();
Результат:
First map - (1), Thread: scheduler-a-1
Second map - (1), Thread: scheduler-a-1
First map - (2), Thread: scheduler-a-1
Second map - (2), Thread: scheduler-a-1
справочную статью можно найти здесь
Ниже приводится отрывок из отличного сообщения в блоге https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers.
publishOn
Это базовый оператор, который вам нужен, когда вы хотите переключать потоки. Входящие сигналы от его источника публикуются в данном планировщике, эффективно переключают потоки на одного из рабочих этого планировщика.
Это действительно для
onNext
,
onComplete
и
onError
сигналы. То есть сигналы, которые проходят от восходящего источника к нисходящему абоненту.
Таким образом, по сути, каждый шаг обработки, который появляется под этим оператором, будет выполняться в новом Scheduler s, пока другой оператор снова не переключится (например, другой
publishOn
).
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
Вывод
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
подписаться
Этот оператор меняет место выполнения метода подписки. А поскольку сигнал подписки течет вверх, он напрямую влияет на то, где исходный Flux подписывается и начинает генерировать данные.
Как следствие, может показаться, что он воздействует на части реактивной цепочки операторов вверх и вниз (до тех пор, пока нет
publishOn
добавлено в смесь):
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url));
}
// sample code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
Вывод
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C