Опубликовать против подписки в Project Reactor 3

Я использую publishOn против подписки на одном и том же потоке следующим образом:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");

Хотя, когда я использую оба, ничего не печатается в журналах. Но когда я использую только publishOn, я получаю следующие информационные журналы:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

Это publishOn более рекомендуется, чем subscribeOn? Или это имеет больше предпочтений, чем подписка? В чем разница между двумя и когда использовать какой?

4 ответа

Решение

Вот небольшая документация, которую я получил:

publishOn применяется так же, как и любой другой оператор, в середине цепочки подписчиков. Он принимает сигналы из нисходящего потока и воспроизводит их в обратном направлении, одновременно выполняя обратный вызов на рабочем месте из связанного планировщика. Следовательно, это влияет на то, где будут выполняться последующие операторы (пока не будет включен другой publishOn).

подписка применяется к процессу подписки, когда создается обратная цепочка. Как следствие, независимо от того, где вы размещаете подписку в цепочке, это всегда влияет на контекст исходной эмиссии. Однако это не влияет на поведение последующих вызовов publishOn. Они по-прежнему переключают контекст выполнения для части цепочки после них.

а также

publishOn заставляет следующий оператор (и, возможно, последующие операторы после следующего) запускаться в другом потоке. Точно так же, подписка заставляет предыдущий оператор (и, возможно, операторы, предшествующие предыдущему) запускаться в другом потоке.

Мне потребовалось время, чтобы понять это, может быть, потому что publishOn обычно объясняется перед subscribeOn, вот, надеюсь, более простое объяснение непрофессионала.

subscribeOn означает запуск исходного излучения источника, например subscribe(), onSubscribe() and request() для указанного рабочего планировщика (другого потока), а также то же самое для любых последующих операций, таких как, например, onNext/onError/onComplete, map etc и независимо от положения subscribeOn() такое поведение будет

И если вы ничего не сделали publishOn в беглых вызовах все, все будет работать в таком потоке.

Но как только ты позвонишь publishOn() скажем посередине, тогда любой последующий вызов оператора будет выполняться на предоставленном работнике планировщика для такого publishOn().

вот пример

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> {
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        })
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();

Результат будет


1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5

Как видите первый doOnNext() и следующие map() выполняется в потоке с именем subscribeOn_thread, это происходит до тех пор, пока publishOn() вызывается, то любой последующий вызов будет выполняться в поставленном планировщике для этого publishOn() и снова это произойдет для любого последующего звонка, пока кто-нибудь не позвонит другому publishOn().

применяется в середине цепочки. Это влияет на последующие операторы послеpublishOn- они будут выполняться в потоке, выбранном из планировщика публикации.

       Flux.range(1, 2)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .publishOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .blockLast();

Результат:

        First map - (1), Thread: main
  First map - (2), Thread: main
  Second map - (1), Thread: scheduler-a-1
  Second map - (2), Thread: scheduler-a-1

Если вы разместитеsubscribeOnв цепочке, это влияет на выбросы источника во всей цепочке

      Flux.range(1, 2)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .subscribeOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .blockLast();

Результат:

        First map - (1), Thread: scheduler-a-1
  Second map - (1), Thread: scheduler-a-1
  First map - (2), Thread: scheduler-a-1
  Second map - (2), Thread: scheduler-a-1

справочную статью можно найти здесь

Ниже приводится отрывок из отличного сообщения в блоге https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers.

publishOn

Это базовый оператор, который вам нужен, когда вы хотите переключать потоки. Входящие сигналы от его источника публикуются в данном планировщике, эффективно переключают потоки на одного из рабочих этого планировщика.

Это действительно для onNext, onComplete и onErrorсигналы. То есть сигналы, которые проходят от восходящего источника к нисходящему абоненту.

Таким образом, по сути, каждый шаг обработки, который появляется под этим оператором, будет выполняться в новом Scheduler s, пока другой оператор снова не переключится (например, другой publishOn).

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

Вывод

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

подписаться

Этот оператор меняет место выполнения метода подписки. А поскольку сигнал подписки течет вверх, он напрямую влияет на то, где исходный Flux подписывается и начинает генерировать данные.

Как следствие, может показаться, что он воздействует на части реактивной цепочки операторов вверх и вниз (до тех пор, пока нет publishOn добавлено в смесь):

final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
           .map(url -> blockingWebClient.get(url));
}

// sample code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

Вывод

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C