Самый эффективный способ превратить Observable в ObservableValue/Binding/EventStream?
Я буду использовать RxJava и ReactFX более интенсивно, но я пытаюсь понять, как их согласовать, поскольку ReactFX не имеет зависимости от RxJava, так как же они могут общаться друг с другом в одной и той же монаде? Это особенно актуально для соединения между JavaFX ObservableValue
RxJava's Observable
и ReactFX StreamEvent
без большого количества шаблонов.
Я хочу составить основную бизнес-логику с помощью RxJava, поскольку они не всегда будут поддерживать приложения JavaFX. Но я хочу использовать интерфейс JavaFX ReactFX
и использовать EventStream
, Поэтому мой вопрос в том, что является наиболее эффективным способом EventStream
в Observable
и Observable
в EventStream
, Binding
, или же ObservableValue
? Я знаю, что могу просто использовать RxJava, но хочу использовать безопасность и удобство потоков в ReactFX...
//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();
//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();
2 ответа
Чтобы включить ReactFX EventStream
в RxJava Observable
:
Observable<Foo> toRx(EventStream<Foo> es) {
PublishSubject<Foo> sub = PublishSubject.create();
es.subscribe(sub::onNext);
return sub;
}
Превратить RxJava Observable
в ReactFX EventStream
:
EventStream<Foo> fromRx(Observable<Foo> obs) {
EventSource<Foo> es = new EventSource<>();
obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
return es;
}
Обратите внимание на Platform.runLater(...)
в последнем случае. Это делает в результате EventStream
генерировать события в потоке приложения JavaFX.
Также обратите внимание, что мы игнорируем Subscription
с вернулся subscribe
методы в обоих случаях. Это нормально, если вы устанавливаете привязку на весь срок действия вашего приложения. Если, с другой стороны, связь между ними должна быть недолгой, в первом случае ваш RxJava-компонент предоставит Subject
ваш компонент ReactFX выставляет EventStream
, а затем сделать subscribe
/unsubscribe
как необходимо. Аналогично для второго случая.
Я не знаком с ReactFX, но, глядя на API, я могу вывести следующие преобразования:
public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
return Observable.create(child -> {
Subscription s = es.subscribe(child::onNext);
child.add(Subscriptions.create(s::unsubscribe));
});
}
public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
return new EventStream<T>() {
final Vector<Consumer<? super T>> observers = new Vector<>();
@Override
public void addObserver(Consumer<? super T> observer) {
observers.add(observer);
}
@Override
public void removeObserver(Consumer<? super T> observer) {
observers.remove(observer);
}
@Override
public Subscription subscribe(Consumer<? super T> subscriber) {
addObserver(subscriber);
rx.Subscriber<T> s = new rx.Subscriber<T>() {
@Override
public void onNext(T t) {
subscriber.accept(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
removeObserver(subscriber);
}
@Override
public void onCompleted() {
removeObserver(subscriber);
}
};
o.subscribe(s);
return () -> {
s.unsubscribe();
removeObserver(subscriber);
};
}
};
}
Оба должны дать вам возможность отписаться, хотя ReactFX не поддерживает синхронную отписку, и я не вижу, можно ли использовать EventStream в качестве горячей или холодной наблюдаемой. Я не мог получить доступ к Binding, поэтому не могу помочь вам там.