Самый эффективный способ превратить Observable в ObservableValue/Binding/EventStream?

Я буду использовать RxJava и ReactFX более интенсивно, но я пытаюсь понять, как их согласовать, поскольку ReactFX не имеет зависимости от RxJava, так как же они могут общаться друг с другом в одной и той же монаде? Это особенно актуально для соединения между JavaFX ObservableValueRxJava's Observableи ReactFX StreamEvent без большого количества шаблонов.

Я хочу составить основную бизнес-логику с помощью RxJava, поскольку они не всегда будут поддерживать приложения JavaFX. Но я хочу использовать интерфейс JavaFX ReactFX и использовать EventStream, Поэтому мой вопрос в том, что является наиболее эффективным способом EventStream в Observableи Observable в EventStream, Binding, или же ObservableValue? Я знаю, что могу просто использовать RxJava, но хочу использовать безопасность и удобство потоков в ReactFX...

//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();

//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();

2 ответа

Решение

Чтобы включить ReactFX EventStream в RxJava Observable:

Observable<Foo> toRx(EventStream<Foo> es) {
    PublishSubject<Foo> sub = PublishSubject.create();
    es.subscribe(sub::onNext);
    return sub;
}

Превратить RxJava Observable в ReactFX EventStream:

EventStream<Foo> fromRx(Observable<Foo> obs) {
    EventSource<Foo> es = new EventSource<>();
    obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
    return es;
}

Обратите внимание на Platform.runLater(...) в последнем случае. Это делает в результате EventStream генерировать события в потоке приложения JavaFX.

Также обратите внимание, что мы игнорируем Subscriptionс вернулся subscribe методы в обоих случаях. Это нормально, если вы устанавливаете привязку на весь срок действия вашего приложения. Если, с другой стороны, связь между ними должна быть недолгой, в первом случае ваш RxJava-компонент предоставит Subjectваш компонент ReactFX выставляет EventStream, а затем сделать subscribe/unsubscribe как необходимо. Аналогично для второго случая.

Я не знаком с ReactFX, но, глядя на API, я могу вывести следующие преобразования:

public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
    return Observable.create(child -> {
        Subscription s = es.subscribe(child::onNext);
        child.add(Subscriptions.create(s::unsubscribe));
    });
}
public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
    return new EventStream<T>() {
        final Vector<Consumer<? super T>> observers = new Vector<>();
        @Override
        public void addObserver(Consumer<? super T> observer) {
            observers.add(observer);
        }

        @Override
        public void removeObserver(Consumer<? super T> observer) {
            observers.remove(observer);
        }
        @Override
        public Subscription subscribe(Consumer<? super T> subscriber) {
            addObserver(subscriber);

            rx.Subscriber<T> s = new rx.Subscriber<T>() {
                @Override
                public void onNext(T t) {
                    subscriber.accept(t);
                }
                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    removeObserver(subscriber);
                }
                @Override
                public void onCompleted() {
                    removeObserver(subscriber);
                }
            };
            o.subscribe(s);

            return () -> {
                s.unsubscribe();
                removeObserver(subscriber);
            };
        }
    };
}

Оба должны дать вам возможность отписаться, хотя ReactFX не поддерживает синхронную отписку, и я не вижу, можно ли использовать EventStream в качестве горячей или холодной наблюдаемой. Я не мог получить доступ к Binding, поэтому не могу помочь вам там.

Другие вопросы по тегам