Запустить поток в главном потоке

Я снова сравниваю RxJava с Java 9 Flow. Я вижу, что Flow по умолчанию работает асинхронно, и мне было интересно, есть ли способ заставить его работать синхронно.

Иногда мы просто хотим использовать его не для Nio, а для сахарного синтаксиса и иметь более однородный код.

В RxJava по умолчанию это синхронно, и вы можете заставить его работать асинхронно, используя observerOn а также subscribeOn в вашем трубопроводе.

В Flow есть оператор для запуска в основном потоке?

С уважением.

3 ответа

Решение

Вы можете определить свой обычай Publisher как указано в Flow за использование синхронного исполнения.

Очень простой издатель, который выдает (только по запросу) один ИСТИННЫЙ элемент одному подписчику. Поскольку подписчик получает только один элемент, этот класс не использует буферизацию и управление упорядочением.

class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }

Оператора для этого нет, но API позволяет контролировать способ публикации элементов. Поэтому вы можете просто вызывать методы подписчика непосредственно из текущего потока.

class SynchronousPublisher implements Publisher<Data>  {
      public synchronized void subscribe(Subscriber<? super Data> subscriber) {
           subscriber.onSubscribe(new SynchronousSubscription(subscriber));
      }
 }
 static class SynchronousSubscription implements Subscription {
      private final Subscriber<? super Data> subscriber;

       SynchronousSubscription(Subscriber<? super Data> subscriber) {
          this.subscriber = subscriber;
       }
       public synchronized void request(long n) {
           ... // prepare item            
           subscriber.onNext(someItem);      
       }

       ...
   }
}

Это зависит от того, что вы подразумеваете под работой в главном потоке.

Если вы хотите принудительно выполнить произвольный поток в определенном потоке, стандартного способа сделать это не существует, если поток не реализован в библиотеке, которая позволяет переопределять части, обеспечивающие асинхронность. В терминах RxJava это Schedulerпредоставлено Schedulers служебный класс.

Если вы хотите наблюдать за потоком в главном потоке, вы должны написать потребителя очереди блокировки поверх Flow.Subscriber который блокирует поток, пока в очереди нет элементов. Это может быть сложно, поэтому я отсылаю вас к blockingSubscribe реализация в Reactive4JavaFlow.

Если вы хотите использовать основной поток Java как Executor/Schedulerэто еще сложнее и требует аналогичного механизма блокировки, а также некоторых идей от исполнителя потоков пула. Reactive4JavaFlow имеет такой планировщик, который вы можете использовать в качестве исполнителя через: new SubmissionPublisher<>(128, blockingScheduler::schedule),

Другие вопросы по тегам