Наблюдаемые против будущей производительности
Я работаю с Vert.x 2.x ( http://vertx.io/), который широко использует асинхронные обратные вызовы. Они быстро становятся громоздкими с типичными проблемами ада вложенности / обратного вызова.
Я рассмотрел как Scala Futures/Promises (который, я думаю, будет подходом defacto), так и Reactive Extensions (RxScala).
Из моего тестирования я нашел некоторые интересные результаты производительности.
Мое тестирование довольно простое, я просто отправляю кучу HTTP-запросов (через weighttp) в vert.x, который выполняет асинхронный вызов через шину событий Vert.x и обрабатывает ответ, который затем возвращается в HTTP 200 ответ.
Я обнаружил следующее (производительность измеряется в единицах HTTP-запросов в секунду):
- Производительность асинхронного обратного вызова = 68 305 об / с
- Производительность Rx = 64,656 об / с
- Производительность Future/Promises = 61 376 об / с
Условия испытаний были:
- Mac Pro OS X Yosemite 10.10.2
- Oracle JVM 1.8U25
- weighttp версия 0.3
- Vert.x 2.1.5
- Scala 2.10.4
- RxScala 0.23.0
- 4 х экземпляра веб-службы
- 4 х инстанса для служебной вертикали
Тестовая команда была
weighttp -n 1000000 -c 128 -7 8 -k "localhost:8888"
Вышеприведенные цифры являются средним значением пяти тестовых прогонов, с лучшим и худшим результатом. Обратите внимание, что результаты очень согласованы с представленным средним (отклонение не более нескольких сотен оборотов в секунду).
Существует ли какая-либо известная причина, по которой может происходить вышеизложенное - то есть Rx > Futures в чистых запросах в секунду?
Реактивные расширения, на мой взгляд, превосходят их, поскольку они могут сделать гораздо больше, но, учитывая стандартный подход к асинхронным обратным вызовам, обычно кажется, что они идут по пути Futures / Promises. Я удивлен падением производительности.
РЕДАКТИРОВАТЬ: Вот вертикаль веб-службы
class WebVerticle extends Verticle {
override def start() {
val port = container.env().getOrElse("HTTP_PORT", "8888").toInt
val approach = container.env().getOrElse("APPROACH", "ASYNC")
container.logger.info("Listening on port: " + port)
container.logger.info("Using approach: " + approach)
vertx.createHttpServer.requestHandler { req: HttpServerRequest =>
approach match {
case "ASYNC" => sendAsync(req, "hello")
case "FUTURES" => sendWithFuture("hello").onSuccess { case body => req.response.end(body) }
case "RX" => sendWithObservable("hello").doOnNext(req.response.end(_)).subscribe()
}
}.listen(port)
}
// Async callback
def sendAsync(req: HttpServerRequest, body: String): Unit = {
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
req.response.end(msg.body())
})
}
// Rx
def sendWithObservable(body: String) : Observable[String] = {
val subject = ReplaySubject[String]()
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
subject.onNext(msg.body())
subject.onCompleted()
})
subject
}
// Futures
def sendWithFuture(body: String) : Future[String] = {
val promise = Promise[String]()
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
promise.success(msg.body())
})
promise.future
}
}
РЕДАКТИРОВАТЬ: вот бэкэнд Veriment
class ServiceVerticle extends Verticle {
override def start(): Unit = {
vertx.eventBus.registerHandler("service.verticle", { msg: Message[String] =>
msg.reply("Hello Scala")
})
}
}