Наблюдаемые против будущей производительности

Я работаю с Vert.x 2.x ( http://vertx.io/), который широко использует асинхронные обратные вызовы. Они быстро становятся громоздкими с типичными проблемами ада вложенности / обратного вызова.

Я рассмотрел как Scala Futures/Promises (который, я думаю, будет подходом defacto), так и Reactive Extensions (RxScala).

Из моего тестирования я нашел некоторые интересные результаты производительности.

Мое тестирование довольно простое, я просто отправляю кучу HTTP-запросов (через weighttp) в vert.x, который выполняет асинхронный вызов через шину событий Vert.x и обрабатывает ответ, который затем возвращается в HTTP 200 ответ.

Я обнаружил следующее (производительность измеряется в единицах HTTP-запросов в секунду):

  • Производительность асинхронного обратного вызова = 68 305 об / с
  • Производительность Rx = 64,656 об / с
  • Производительность Future/Promises = 61 376 об / с

Условия испытаний были:

  • Mac Pro OS X Yosemite 10.10.2
  • Oracle JVM 1.8U25
  • weighttp версия 0.3
  • Vert.x 2.1.5
  • Scala 2.10.4
  • RxScala 0.23.0
  • 4 х экземпляра веб-службы
  • 4 х инстанса для служебной вертикали

Тестовая команда была

weighttp -n 1000000 -c 128 -7 8 -k "localhost:8888"

Вышеприведенные цифры являются средним значением пяти тестовых прогонов, с лучшим и худшим результатом. Обратите внимание, что результаты очень согласованы с представленным средним (отклонение не более нескольких сотен оборотов в секунду).

Существует ли какая-либо известная причина, по которой может происходить вышеизложенное - то есть Rx > Futures в чистых запросах в секунду?

Реактивные расширения, на мой взгляд, превосходят их, поскольку они могут сделать гораздо больше, но, учитывая стандартный подход к асинхронным обратным вызовам, обычно кажется, что они идут по пути Futures / Promises. Я удивлен падением производительности.

РЕДАКТИРОВАТЬ: Вот вертикаль веб-службы

class WebVerticle extends Verticle {
  override def start() {
    val port = container.env().getOrElse("HTTP_PORT", "8888").toInt
    val approach = container.env().getOrElse("APPROACH", "ASYNC")
    container.logger.info("Listening on port: " + port)
    container.logger.info("Using approach: " + approach)

    vertx.createHttpServer.requestHandler { req: HttpServerRequest =>
      approach match {
        case "ASYNC" => sendAsync(req, "hello")
        case "FUTURES" => sendWithFuture("hello").onSuccess { case body => req.response.end(body) }
        case "RX" => sendWithObservable("hello").doOnNext(req.response.end(_)).subscribe()
      }
    }.listen(port)
  }

  // Async callback
  def sendAsync(req: HttpServerRequest, body: String): Unit = {
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      req.response.end(msg.body())
    })
  }

  // Rx 
  def sendWithObservable(body: String) : Observable[String] = {
    val subject = ReplaySubject[String]()
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      subject.onNext(msg.body())
      subject.onCompleted()
    })
    subject
  }

  // Futures    
  def sendWithFuture(body: String) : Future[String] = {
    val promise = Promise[String]()
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      promise.success(msg.body())
    })
    promise.future
  }
}

РЕДАКТИРОВАТЬ: вот бэкэнд Veriment

class ServiceVerticle extends Verticle {
  override def start(): Unit = {
    vertx.eventBus.registerHandler("service.verticle", { msg: Message[String] =>
      msg.reply("Hello Scala")
    })
  }
}

0 ответов

Другие вопросы по тегам