Запустить поток в главном потоке
Я снова сравниваю RxJava с Java 9 Flow. Я вижу, что Flow по умолчанию работает асинхронно, и мне было интересно, есть ли способ заставить его работать синхронно.
Иногда мы просто хотим использовать его не для Nio, а для сахарного синтаксиса и иметь более однородный код.
В RxJava по умолчанию это синхронно, и вы можете заставить его работать асинхронно, используя observerOn
а также subscribeOn
в вашем трубопроводе.
В Flow есть оператор для запуска в основном потоке?
С уважением.
3 ответа
Вы можете определить свой обычай Publisher
как указано в Flow
за использование синхронного исполнения.
Очень простой издатель, который выдает (только по запросу) один ИСТИННЫЙ элемент одному подписчику. Поскольку подписчик получает только один элемент, этот класс не использует буферизацию и управление упорядочением.
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
Оператора для этого нет, но API позволяет контролировать способ публикации элементов. Поэтому вы можете просто вызывать методы подписчика непосредственно из текущего потока.
class SynchronousPublisher implements Publisher<Data> {
public synchronized void subscribe(Subscriber<? super Data> subscriber) {
subscriber.onSubscribe(new SynchronousSubscription(subscriber));
}
}
static class SynchronousSubscription implements Subscription {
private final Subscriber<? super Data> subscriber;
SynchronousSubscription(Subscriber<? super Data> subscriber) {
this.subscriber = subscriber;
}
public synchronized void request(long n) {
... // prepare item
subscriber.onNext(someItem);
}
...
}
}
Это зависит от того, что вы подразумеваете под работой в главном потоке.
Если вы хотите принудительно выполнить произвольный поток в определенном потоке, стандартного способа сделать это не существует, если поток не реализован в библиотеке, которая позволяет переопределять части, обеспечивающие асинхронность. В терминах RxJava это Scheduler
предоставлено Schedulers
служебный класс.
Если вы хотите наблюдать за потоком в главном потоке, вы должны написать потребителя очереди блокировки поверх Flow.Subscriber
который блокирует поток, пока в очереди нет элементов. Это может быть сложно, поэтому я отсылаю вас к blockingSubscribe
реализация в Reactive4JavaFlow.
Если вы хотите использовать основной поток Java как Executor
/Scheduler
это еще сложнее и требует аналогичного механизма блокировки, а также некоторых идей от исполнителя потоков пула. Reactive4JavaFlow имеет такой планировщик, который вы можете использовать в качестве исполнителя через: new SubmissionPublisher<>(128, blockingScheduler::schedule)
,