Какой "EventBus" использовать в Spring? Встроенный, Реактор, Акка?
Мы собираемся запустить новое приложение Spring 4 через несколько недель. И мы хотели бы использовать некоторую управляемую событиями архитектуру. В этом году я читал кое-что о "Реакторе" и, отыскивая его в Интернете, я наткнулся на "Акку".
Итак, на данный момент у нас есть 3 варианта:
- Спринга
ApplicationEvent
: http://docs.spring.io/spring/docs/4.0.0.RELEASE/javadoc-api/org/springframework/context/ApplicationEvent.html Reactor
: https://github.com/reactor/reactorAkka
: http://akka.io/
Я не мог найти реальное сравнение тех.
Сейчас нам просто нужно что-то вроде:
X
регистры для прослушиванияEvent E
Y
регистры для прослушиванияEvent E
Z
отправляетEvent E
А потом X
а также Y
получит и обработает событие.
Мы, скорее всего, будем использовать это асинхронно, но наверняка будут и некоторые синхронные сценарии. И мы, скорее всего, всегда отправляем класс как событие. (Образцы Reactor в основном используют шаблоны Strings и String, но также поддерживают Objects).
Насколько я понял, ApplicationEvent
по умолчанию работает синхронно и Reactor
работает асинхронно. А также Reactor
также позволяет использовать await()
способ сделать это вроде синхронно. Akka
обеспечивает более или менее так же, как Reactor
, но также поддерживает удаленное взаимодействие.
По поводу реактора await()
Метод: Может ли он ждать завершения нескольких потоков? Или, может быть, даже частичный набор этих тем? Если мы возьмем пример сверху:
X
регистры для прослушиванияEvent E
Y
регистры для прослушиванияEvent E
Z
отправляетEvent E
Можно ли сделать это синхронно, сказав: X
а также Y
завершить. И можно ли заставить его ждать только X
, но не для Y
?
Может быть, есть и альтернативы? Как насчет, например, JMS?
Много вопросов, но, надеюсь, вы можете дать некоторые ответы!
Спасибо!
РЕДАКТИРОВАТЬ: пример использования
Когда происходит конкретное событие, я хотел бы создать 10000 электронных писем. Каждое письмо должно генерироваться с пользовательским контентом. Поэтому я бы создал много потоков (max = системные ядра процессора), которые создают письма и не блокируют поток вызывающих, потому что это может занять несколько минут.
Когда происходит конкретное событие, я хотел бы получить информацию от неизвестного количества служб. Каждый выбор занимает около 100 мс. Здесь я мог представить себе использование Reactor's
await
потому что мне нужна эта информация для продолжения моей работы в главном потоке.Когда происходит конкретное событие, я хотел бы выполнить некоторые операции в зависимости от конфигурации приложения. Таким образом, приложение должно иметь возможность динамически (не) регистрировать comsumers/ обработчики событий. Они будут делать свое дело с Событием, и мне все равно. Поэтому я бы создал поток для каждого из этих обработчиков и просто продолжил свою работу в основном потоке.
Простая развязка: я в основном знаю все получатели, но я просто не хочу вызывать каждый получатель в своем коде. В основном это должно быть сделано синхронно.
Звучит так, как будто мне нужен ThreadPool или RingBuffer. Есть ли у этих структур динамические RingBuffers, которые увеличиваются в размере, если это необходимо?
3 ответа
Я не уверен, что смогу адекватно ответить на ваш вопрос в этом небольшом пространстве. Но я сделаю это!:)
Спринга ApplicationEvent
Система и Reactor действительно отличаются друг от друга по функциональности. ApplicationEvent
маршрутизация основана на типе, обрабатываемом ApplicationListener
, Что-нибудь более сложное, чем это, и вам придется реализовать логику самостоятельно (хотя это не обязательно плохо). Reactor, однако, предоставляет комплексный уровень маршрутизации, который также очень легкий и полностью расширяемый. Любое сходство функций между этими двумя функциями заканчивается их способностью подписываться и публиковать события, что действительно является особенностью любой управляемой событиями системы. Также не забывайте новое spring-messaging
Модуль вышел из Spring 4. Это подмножество инструментов, доступных в Spring Integration, а также предоставляет абстракции для построения на основе событийно-ориентированной архитектуры.
Reactor поможет вам решить пару ключевых проблем, которые в противном случае вам пришлось бы решать самостоятельно:
Соответствие селектора: Reactor делает Selector
сопоставление, которое охватывает диапазон совпадений - от простого .equals(Object other)
вызов к более сложному сопоставлению шаблонов URI, которое позволяет извлекать заполнители. Вы также можете расширить встроенные селекторы с помощью своей собственной логики, чтобы вы могли использовать расширенные объекты в качестве ключей уведомлений (например, объекты домена).
API Stream и Promise: вы упомянули Promise
API уже со ссылкой на .await()
метод, который действительно предназначен для существующего кода, который ожидает поведение блокировки. При написании нового кода с использованием Reactor нельзя слишком сильно подчеркивать использование композиций и обратных вызовов для эффективного использования системных ресурсов, не блокируя потоки. Блокировка вызывающей стороны почти никогда не является хорошей идеей в архитектуре, которая зависит от небольшого количества потоков для выполнения большого объема задач. Фьючерсы просто не масштабируются в облаке, поэтому современные приложения используют альтернативные решения.
Ваше приложение может быть спроектировано с помощью потоков или обещаний, хотя, если честно, я думаю, вы найдете Stream
более гибкий. Основным преимуществом является комбинируемость API, которая позволяет связывать действия вместе в цепочке зависимостей без блокировки. В качестве полностью неконструктивного примера, основанного на вашем сценарии использования электронной почты, вы описываете:
@Autowired
Environment env;
@Autowired
SmtpClient client;
// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);
input.compose()
.map(new Function<DomainObject, EmailTemplate>() {
public EmailTemplate apply(DomainObject in) {
// generate the email
return new EmailTemplate(in);
}
})
.consume(new Consumer<EmailTemplate>() {
public void accept(EmailTemplate email) {
// send the email
client.send(email);
}
});
// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
input.accept(obj);
}
Реактор также обеспечивает Границу, которая в основном CountDownLatch
для блокировки на произвольных потребителей (так что вам не нужно создавать Promise
если все, что вы хотите сделать, это заблокировать Consumer
завершение). Вы могли бы использовать сырье Reactor
в этом случае и использовать on()
а также notify()
методы запуска проверки статуса сервиса.
Однако для некоторых вещей кажется, что вы хотите Future
вернулся из ExecutorService
нет? Почему бы просто не сделать вещи простыми? Реактор будет иметь реальную выгоду только в тех случаях, когда важны производительность и производительность. Если вы блокируете вызывающий поток, то, скорее всего, вы будете уничтожать выигрыш в эффективности, который Reactor даст вам в любом случае, так что в этом случае вам будет лучше, если использовать более традиционный набор инструментов.
Хорошая особенность открытости Reactor заключается в том, что ничто не мешает им взаимодействовать. Вы можете свободно смешивать Futures
с Consumers
без статики. В этом случае, просто имейте в виду, что вы когда-либо будете работать так же быстро, как ваш самый медленный компонент.
Давайте проигнорируем весну ApplicationEvent
поскольку он действительно не предназначен для того, что вы просите (это больше об управлении жизненным циклом бобов).
Что вам нужно выяснить, если вы хотите сделать это
- объектно-ориентированный путь (т.е. актеры, динамические потребители, зарегистрированные на лету) ИЛИ
- способ обслуживания (статические потребители, зарегистрированные при запуске).
Используя ваш пример X
а также Y
они:
- эфемерные случаи (1) или они
- долгоживущие синглтоны / сервисные объекты (2)?
Если вам нужно регистрировать потребителей на лету, тогда Akka - хороший выбор (я не уверен насчет реактора, так как никогда не использовал его). Если вы не хотите потреблять эфемерные объекты, вы можете использовать JMS или AMQP.
Вы также должны понимать, что такого рода библиотеки пытаются решить две проблемы:
- Параллелизм (то есть параллельное выполнение операций на одной и той же машине)
- Распределение (то есть параллельная работа на нескольких машинах)
Реактор и Акка в основном сосредоточены на #1. Акка недавно добавила поддержку кластеров, а абстракция актера облегчает выполнение #2. Очереди сообщений (JMS, AMQP) ориентированы на #2.
Для своей работы я делаю маршрут обслуживания и использую сильно модифицированные Guava EventBus и RabbitMQ. Я использую аннотации, подобные Guava Eventbus, но также есть аннотации для объектов, отправляемых на шину, однако вы можете просто использовать EventBus Guava в асинхронном режиме в качестве POC, а затем создать свой собственный, как я.
Вы можете подумать, что вам нужны динамические потребители (1), но большинство проблем можно решить с помощью простого паба / саба. Также может быть сложно управлять динамическими потребителями (следовательно, Akka - хороший выбор, потому что модель актора имеет для этого все виды управления)
Тщательно определите, что вы хотите от рамок. Если фреймворк имеет больше возможностей, чем вам нужно, это не всегда хорошо. Больше возможностей означает больше ошибок, больше кода для изучения и меньшую производительность.
Некоторые особенности, о которых нужно беспокоиться:
- характер актеров (нити или легкие предметы)
- умение работать на машинном кластере (акка)
- постоянные очереди сообщений (JMS)
- специфические функции, такие как сигналы (события без информации), переходы (объекты для объединения сообщений из разных портов в сложные события, см. сети Петри) и т. д.
Будьте осторожны с синхронными функциями, такими как await - он блокирует весь поток и опасен, когда актеры выполняются в пуле потоков (голодание потока).
Больше рамок для просмотра:
Fork-Join Pool - в некоторых случаях позволяет await
без нити голодания
Научные системы документооборота
Каркас потока данных для Java - сигналы, переходы
ADD-ON: два вида актеров.
Как правило, параллельная рабочая система может быть представлена в виде графа, где активные узлы отправляют сообщения друг другу. В Java, как и в большинстве других основных языков, активные узлы (акторы) могут быть реализованы в виде потоков или задач (Runnable или Callable), выполняемых пулом потоков. Обычно часть действующих лиц - это потоки, а часть - задачи. Оба подхода имеют свои преимущества и недостатки, поэтому крайне важно выбрать наиболее подходящую реализацию для каждого участника системы. Вкратце, потоки могут блокировать (и ждать события), но потреблять много памяти для своих стеков. Задачи могут не блокировать, но использовать общие стеки (потоков в пуле).
Если задача вызывает блокирующую операцию, она исключает объединенный поток из службы. Если многие задачи блокируют, они могут исключить все потоки, вызывая взаимоблокировку - те задачи, которые могут разблокировать заблокированные задачи, не могут быть запущены. Этот вид тупика называется нить голодом. Если, пытаясь предотвратить голодание потоков, сконфигурировать пул потоков как неограниченный, мы просто преобразуем задачи в потоки, теряя преимущества задач.
Чтобы исключить вызовы блокирующих операций в задачах, задача должна быть разбита на две (или более) - первая задача вызывает блокирующую операцию и завершается, а остальная часть форматируется как асинхронная задача, запускаемая после завершения блокирующей операции. Конечно, операция блокировки должна иметь альтернативный асинхронный интерфейс. Так, например, вместо синхронного чтения сокетов следует использовать библиотеки NIO или NIO2.
К сожалению, в стандартной библиотеке Java отсутствуют асинхронные аналоги для популярных средств синхронизации, таких как очереди и семафоры. К счастью, их легко реализовать с нуля (примеры см. В разделе Dataflow для Java).
Таким образом, выполнение вычислений исключительно с неблокирующими задачами возможно, но увеличивает размер кода. Очевидный совет - использовать потоки, где это возможно, и задачи только для простых массивных вычислений.