EventBus/PubSub против (реактивных расширений) RX с точки зрения ясности кода в однопоточном приложении

В настоящее время я использую архитектуру / шаблон EventBus / PubSub с Scala (и JavaFX) для реализации простого приложения для организации заметок (вроде клиента Evernote с некоторыми дополнительными функциями отображения разума), и я должен сказать, что мне действительно нравится EventBus поверх образец наблюдателя.

Вот некоторые библиотеки EventBus:

https://code.google.com/p/guava-libraries/wiki/EventBusExplained

http://eventbus.org/ (в настоящее время, кажется, не работает), это то, что я использую в своей реализации.

http://greenrobot.github.io/EventBus/

Вот сравнение библиотек EventBus: http://codeblock.engio.net/37/

EventBus связан с шаблоном публикации-подписки.

Тем не мение!

Недавно я прошел курс Reactive от Coursera и начал задаваться вопросом, упростит ли использование RXJava вместо EventBus код обработки событий еще больше в однопоточном приложении?

Я хотел бы спросить об опыте людей, которые программировали с использованием обеих технологий (какая-то библиотека событий и некоторая форма реактивных расширений (RX)): было ли проще справиться со сложностью обработки событий с помощью RX, чем с архитектурой шины событий чтобы не было необходимости использовать несколько потоков?

Я спрашиваю об этом, потому что я слышал в Reactive Lectures на Coursera, что RX приводит к гораздо более чистому коду, чем использование шаблона наблюдателя (т. Е. Нет "ада обратного вызова"), однако я не нашел никакого сравнения между архитектурой EventBus и RXJava. Итак, ясно, что и EventBus, и RXJava лучше, чем шаблон наблюдателя, но что лучше в однопоточных приложениях с точки зрения ясности кода и удобства сопровождения?

Если я правильно понимаю, основной смысл продажи RXJava заключается в том, что его можно использовать для создания отзывчивых приложений, если есть операции блокировки (например, ожидание ответа от сервера).

Но меня не волнует асихронность, все, что меня волнует, - это поддержание кода в чистоте, неразберихе и легкости рассуждения в однопоточном приложении.

В этом случае все же лучше использовать RXJava, чем EventBus?

Я думаю, что EventBus был бы более простым и чистым решением, и я не вижу никакой причины, почему я должен использовать RXJava для однопоточного приложения в пользу простой архитектуры EventBus.

Но я могу ошибаться!

Пожалуйста, исправьте меня, если я ошибаюсь, и объясните, почему RXJava будет лучше, чем простой EventBus, в случае однопоточного приложения, где не выполняются операции блокировки.

4 ответа

Решение

Ниже приведены преимущества использования реактивных потоков событий в однопоточном синхронном приложении.

1. Более декларативный, меньше побочных эффектов и менее изменчивое состояние.

Потоки событий способны инкапсулировать логику и состояние, потенциально оставляя ваш код без побочных эффектов и изменяемых переменных.

Рассмотрим приложение, которое считает количество нажатий кнопок и отображает количество нажатий в виде метки.

Простое решение JavaFX:

private int counter = 0; // mutable field!!!

Button incBtn = new Button("Increment");
Label label = new Label("0");

incBtn.addEventHandler(ACTION, a -> {
    label.setText(Integer.toString(++counter)); // side-effect!!!
});

Решение ReactFX:

Button incBtn = new Button("Increment");
Label label = new Label("0");

EventStreams.eventsOf(incBtn, ACTION)
        .accumulate(0, (n, a) -> n + 1)
        .map(Object::toString)
        .feedTo(label.textProperty());

Изменяемая переменная не используется, а побочное действие присваивается label.textProperty() скрыт за абстракцией.

В своей магистерской диссертации Eugen предложил интегрировать ReactFX со Scala. Используя его интеграцию, решение может выглядеть так:

val incBtn = new Button("Increment")
val label = new Label("0")

label.text |= EventStreams.eventsOf(incBtn, ACTION)
    .accumulate(0, (n, a) => n + 1)
    .map(n => n.toString)

Это эквивалентно предыдущему, с дополнительным преимуществом устранения инверсии управления.

2. Средства для устранения глюков и лишних вычислений. (Только ReactFX)

Глюки - это временные несоответствия в наблюдаемом состоянии. ReactFX имеет средства для приостановки распространения событий до тех пор, пока не будут обработаны все обновления объекта, избегая как сбоев, так и избыточных обновлений. В частности, обратите внимание на приостановленные потоки событий, индикатор, InhiBeans и мой пост в блоге о InhiBeans. Эти методы основаны на том факте, что распространение событий происходит синхронно, поэтому не переводятся в rxJava.

3. Очистить связь между производителем события и потребителем события.

Шина событий - это глобальный объект, на который любой может публиковать и подписываться. Связь между производителем событий и потребителем событий является косвенной и, следовательно, менее ясной. В случае реактивных потоков событий связь между производителем и потребителем гораздо более очевидна. Для сравнения:

Шина событий:

class A {
    public void f() {
        eventBus.post(evt);
    }
}

// during initialization
eventBus.register(consumer);
A a = new A();

Отношение между a а также consumer не ясно, глядя на код инициализации.

Потоки событий:

class A {
    public EventStream<MyEvent> events() { /* ... */ }
}

// during initialization
A a = new A();
a.events().subscribe(consumer);

Отношение между a а также consumer очень явно.

4. События, публикуемые объектом, проявляются в его API.

Используя пример из предыдущего раздела, в примере шины событий, AAPI не сообщает вам, какие события публикуются A, С другой стороны, в примере потоков событий AAPI утверждает, что экземпляры A публиковать события типа MyEvent,

Я думаю, что вы должны использовать rxjava, потому что он обеспечивает гораздо большую гибкость. Если вам нужен автобус, вы можете использовать перечисление вот так:

public enum Events {

  public static PublishSubject <Object> myEvent = PublishSubject.create ();
}

//where you want to publish something
Events.myEvent.onNext(myObject);

//where you want to receive an event
Events.myEvent.subscribe (...);

,

Я узнал одну или две вещи, так как я задал этот вопрос 2 года назад, вот мое текущее понимание (как объяснено в книге FRP Стивена):

Оба пытаются помочь описать конечный автомат, то есть описать, как состояние программы изменяется в ответ на события.

Ключевое различие между EventBus и FRP заключается в композиционности:

  • Что такое композиционный?

    • Функциональное программирование является композиционным. Мы все можем согласиться с этим. Мы можем взять любую чистую функцию, объединить ее с другими чистыми функциями, и мы получим более сложную чистую функцию.
    • Композиционность означает, что когда вы объявляете что-то, то на месте объявления определяется все поведение объявленной сущности.
  • FRP - это композиционный способ описания конечного автомата, а Event-Bus - нет. Зачем?

    • Это описывает конечный автомат.
    • Это композиционно, потому что описание сделано с использованием чистых функций и неизменных значений.
  • EventBus не является композиционным способом описания конечного автомата. Почему бы и нет?

    • Вы не можете взять любые две шины событий и скомпоновать их так, чтобы получить новую составную шину событий, описывающую составной конечный автомат. Почему бы и нет?
      • Шины для мероприятий не являются гражданами первого класса (в отличие от Event / Stream FRP) .
        • Что произойдет, если вы попытаетесь сделать туристические автобусы первоклассными гражданами?
          • Тогда вы получите что-то похожее на FRP/RX.
      • Состояние под влиянием событий не автобусы
        • граждане первого класса (то есть референтно прозрачные, чистые ценности в отличие от поведения / ячейки FRP)
        • привязаны к шинам событий декларативным / функциональным способом, вместо этого состояние изменено императивно, вызванным обработкой события

Таким образом, EventBus не является композиционным, потому что значение и поведение составного EventBus (то есть временная эволюция состояния, на которое влияет упомянутый составной EventBus), зависит от времени (то есть состояния тех частей программного обеспечения, которые не включены явно в объявлении составленного EventBus) . Другими словами, если бы я попытался объявить составной EventBus, то было бы невозможно определить (просто взглянув на объявление составной EventBus), какие правила управляют развитием состояний тех состояний, на которые влияет составной EventBus, это в отличие от FRP, где это можно сделать.)

Согласно моему комментарию выше, у JavaFx есть класс ObservableValue, который как бы соответствует RX Observable (наверное ConnectableObservable чтобы быть более точным, поскольку это позволяет более чем одной подписке). Я использую следующий неявный класс для преобразования из RX в JFX, например так:

import scala.collection.mutable.Map
import javafx.beans.InvalidationListener
import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import rx.lang.scala.Observable
import rx.lang.scala.Subscription

/**
 * Wrapper to allow interoperability bewteen RX observables and JavaFX
 * observables. 
 */
object JfxRxImplicitConversion {
  implicit class JfxRxObservable[T](theObs : Observable[T]) extends ObservableValue[T] { jfxRxObs =>
    val invalListeners : Map[InvalidationListener,Subscription] = Map.empty
    val changeListeners : Map[ChangeListener[_ >: T],Subscription] = Map.empty
    var last : T = _
    theObs.subscribe{last = _}

    override def getValue() : T = last 

    override def addListener(arg0 : InvalidationListener) : Unit = {
      invalListeners += arg0 -> theObs.subscribe { next : T => arg0.invalidated(jfxRxObs) }
    }

    override def removeListener(arg0 : InvalidationListener) : Unit = {
      invalListeners(arg0).unsubscribe
      invalListeners - arg0
    }

    override def addListener(arg0 : ChangeListener[_ >: T]) : Unit = {
      changeListeners += arg0 -> theObs.subscribe { next : T => arg0.changed(jfxRxObs,last,next) }
    }

    override def removeListener(arg0 : ChangeListener[_ >: T]) : Unit = {
      changeListeners(arg0).unsubscribe
      changeListeners - arg0
    }
  }
}

Затем позволяет использовать привязки свойств следующим образом (это ScalaFX, но соответствует Property.bind в JavaFX):

new Label {
    text <== rxObs
}

куда rxObs может быть например:

val rxObs : rx.Observable[String] = Observable.
  interval(1 second).
  map{_.toString}.
  observeOn{rx.lang.scala.schedulers.ExecutorScheduler(JavaFXExecutorService)} 

который просто счетчик, который увеличивается каждую секунду. Просто не забудьте импортировать неявный класс. Я не могу себе представить, что становится чище, чем это!

Вышесказанное немного запутано из-за необходимости использовать планировщик, который прекрасно работает с JavaFx. Смотрите этот вопрос для ссылки на суть того, как JavaFXExecutorService реализовано. Существует запрос на усовершенствование scala RX, чтобы превратить это в неявный аргумент, поэтому в будущем вам может не понадобиться .observeOn вызов.

Другие вопросы по тегам