Время передачи сообщений Akka

Я работаю над искусственным симулятором жизни со Скалой и Аккой, и до сих пор я был очень доволен обоими. У меня есть некоторые проблемы со временем, однако, я не могу объяснить.

На данный момент каждое животное в моей симуляции - это пара актеров (животное + мозг). Как правило, эти два актера сменяются (животное посылает сенсорный ввод в мозг, ждет результата, воздействует на него и начинает заново). Время от времени, однако, животные должны взаимодействовать друг с другом, чтобы есть друг друга или размножаться.

Одна странная вещь для меня - это время. Оказывается, что отправка сообщения от одного животного другому медленнее (примерно в 100 раз), чем от животного к мозгу. Это ставит моих бедных хищников и сексуально активных животных в невыгодное положение по сравнению с вегетарианцами и бесполыми существами (отказ от ответственности: я сам вегетарианец, но я думаю, что есть и более веские причины быть вегетарианцем, чем немного застрять при попытке охоты)..).

Я извлек минимальный фрагмент кода, который демонстрирует проблему:

package edu.blindworld.test

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

class Animal extends Actor {
  val brain = context.actorOf(Props(classOf[Brain]))
  var animals: Option[List[ActorRef]] = None

  var brainCount = 0
  var brainRequestStartTime = 0L
  var brainNanos = 0L

  var peerCount = 0
  var peerRequestStartTime = 0L
  var peerNanos = 0L

  override def receive = {
    case Go(all) =>
      animals = Some(all)
      performLoop()
    case BrainResponse =>
      brainNanos += (System.nanoTime() - brainRequestStartTime)
      brainCount += 1
      // Animal interactions are rare
      if (Random.nextDouble() < 0.01) {
        // Send a ping to a random other one (or ourselves). Defer our own loop
        val randomOther = animals.get(Random.nextInt(animals.get.length))
        peerRequestStartTime = System.nanoTime()
        randomOther ! PeerRequest
      } else {
        performLoop()
      }
    case PeerResponse =>
      peerNanos += (System.nanoTime() - peerRequestStartTime)
      peerCount += 1
      performLoop()
    case PeerRequest =>
      sender() ! PeerResponse
    case Stop =>
      sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
      context.stop(brain)
      context.stop(self)
  }

  def performLoop() = {
    brain ! BrainRequest
    brainRequestStartTime = System.nanoTime()
  }
}

class Brain extends Actor {
  override def receive = {
    case BrainRequest =>
      sender() ! BrainResponse
  }
}

case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)

case object BrainRequest
case object BrainResponse

case object PeerRequest
case object PeerResponse

object ActorTest extends App {
  println("Sampling...")
  val system = ActorSystem("Test")
  val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
  animals.foreach(_ ! Go(animals))
  Thread.sleep(5000)
  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
  val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
  val brainCount = stats.foldLeft(0)(_ + _.brainCount)
  val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
  val peerCount = stats.foldLeft(0)(_ + _.peerCount)
  val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
  println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
  println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
  system.shutdown()
}

Вот что здесь происходит:

  • Я создаю 50 пар животных / актеров мозга
  • Все они запускаются и работают в течение 5 секунд
  • Каждое животное делает бесконечный цикл, по очереди с его мозгом
  • В 1% всех запусков животное посылает пинг случайному другому животному и ждет его ответа. Затем он продолжает свой цикл со своим мозгом
  • Каждый запрос к мозгу и сверстникам измеряется, так что мы можем получить среднее
  • Через 5 секунд все останавливается и сравнивается время для мозговых запросов и пингов для пиров

На моем двухъядерном i7 я вижу эти цифры:

Среднее время для мозгового запроса: 0,004708 мс (выборка из 21073859 запросов)

Среднее время для пингов одноранговой сети: 0,66866 мс (выборка из 211167 запросов)

Таким образом, пинг для пиров в 165 раз медленнее, чем запросы к мозгу. Я пробовал много вещей, чтобы исправить это (например, приоритетные почтовые ящики и прогрев JIT), но не смог понять, что происходит. У кого-нибудь есть идея?

1 ответ

Я думаю, что вы должны использовать шаблон запроса для обработки сообщения. В вашем коде BrainRequest был отправлен мозговому субъекту, а затем отправил обратно BrainResponse. Проблема была здесь. BrainResponse не был ответом BrainRequest. Возможно, это был предыдущий ответ BrainRequest.

В следующем коде используется шаблон Ask, и результат perf практически одинаков.

package edu.blindworld.test

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random

class Animal extends Actor {
  val brain = context.actorOf(Props(classOf[Brain]))
  var animals: Option[List[ActorRef]] = None

  var brainCount = 0
  var brainRequestStartTime = 0L
  var brainNanos = 0L

  var peerCount = 0
  var peerRequestStartTime = 0L
  var peerNanos = 0L

  override def receive = {
    case Go(all) =>
      animals = Some(all)
      performLoop()
    case PeerRequest =>
      sender() ! PeerResponse
    case Stop =>
      sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
      context.stop(brain)
      context.stop(self)
  }

  def performLoop(): Unit = {
    brainRequestStartTime = System.nanoTime()
    brain.ask(BrainRequest)(10.millis) onSuccess {
      case _ =>
        brainNanos += (System.nanoTime() - brainRequestStartTime)
        brainCount += 1
        // Animal interactions are rare
        if (Random.nextDouble() < 0.01) {
          // Send a ping to a random other one (or ourselves). Defer our own loop
          val randomOther = animals.get(Random.nextInt(animals.get.length))
          peerRequestStartTime = System.nanoTime()
          randomOther.ask(PeerRequest)(10.millis) onSuccess {
            case _ =>
              peerNanos += (System.nanoTime() - peerRequestStartTime)
              peerCount += 1
              performLoop()
          }
        } else {
          performLoop()
        }
    }
  }
}

class Brain extends Actor {
  override def receive = {
    case BrainRequest =>
      sender() ! BrainResponse
  }
}

case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)

case object BrainRequest
case object BrainResponse

case object PeerRequest
case object PeerResponse

object ActorTest extends App {
  println("Sampling...")
  val system = ActorSystem("Test")
  val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
  animals.foreach(_ ! Go(animals))
  Thread.sleep(5000)
  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
  val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
  val brainCount = stats.foldLeft(0)(_ + _.brainCount)
  val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
  val peerCount = stats.foldLeft(0)(_ + _.peerCount)
  val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
  println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
  println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
  system.shutdown()
}
Другие вопросы по тегам