Факторный расчет с использованием актеров Scala

Как вычислить факториал с использованием акторов Scala?

И будет ли это более эффективным по сравнению с, например,

def factorial(n: Int): BigInt = (BigInt(1) to BigInt(n)).par.product

Большое спасибо.

1 ответ

Решение

проблема

Вы должны разделить свой вклад в частичные продукты. Эти частичные произведения затем могут быть рассчитаны параллельно. Частичные продукты затем умножаются, чтобы получить конечный продукт.

Это может быть сведено к более широкому классу проблем: так называемое вычисление префикса Parallel. Вы можете прочитать об этом в Википедии.

Короткая версия: когда вы рассчитываете a*b*c*d с ассоциативной операцией _ * _Вы можете структурировать расчет a*(b*(c*d)) или же (a*b)*(c*d), При втором подходе вы можете рассчитать a*b а также c*d параллельно, а затем рассчитать окончательный результат из этих частичных результатов. Конечно, вы можете сделать это рекурсивно, когда у вас есть большее количество входных значений.

Решение

отказ

Это звучит немного как домашнее задание. Поэтому я предоставлю решение, которое имеет два свойства:

  1. Содержит небольшую ошибку
  2. Он показывает, как решить параллельный префикс в целом, без непосредственного решения проблемы.

Таким образом, вы можете видеть, как решение должно быть структурировано, но никто не может использовать его, чтобы обмануть ее домашнюю работу.

Решение в деталях

Сначала мне нужно немного импорта

import akka.event.Logging import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration импорт akka.actor._

Затем я создаю несколько вспомогательных классов для общения между актерами.

case class Calculate[T](values : Seq[T], segment : Int, parallelLimit : Int, fn : (T,T) => T)

trait CalculateResponse
case class CalculationResult[T](result : T, index : Int) extends CalculateResponse
case object Busy extends CalculateResponse

Вместо того, чтобы сказать получателю, что вы заняты, актер может также использовать тайник или реализовать собственную очередь для частичных результатов. Но в этом случае я думаю, что отправитель должен решить, сколько параллельных вычислений разрешено.

Теперь я создаю актера:

class ParallelPrefixActor[T] extends Actor {
  val log = Logging(context.system, this)
  val subCalculation = Props(classOf[ParallelPrefixActor[BigInt]])
  val fanOut = 2
  def receive = waitForCalculation

  def waitForCalculation : Actor.Receive = {
    case c : Calculate[T] =>
      log.debug(s"Start calculation for ${c.values.length} values, segment nr. ${c.index}, from ${c.values.head} to ${c.values.last}")
      if (c.values.length < c.parallelLimit) {
        log.debug("Calculating result direct")
        val result = c.values.reduceLeft(c.fn)
        sender ! CalculationResult(result, c.index)
      }else{
        val groupSize: Int = Math.max(1, (c.values.length / fanOut) + Math.min(c.values.length % fanOut, 1))
        log.debug(s"Splitting calculation for ${c.values.length} values up to ${fanOut} children, ${groupSize} elements each, limit ${c.parallelLimit}")
        def segments=c.values.grouped(groupSize)
        log.debug("Starting children")
        segments.zipWithIndex.foreach{case (values, index) =>
          context.actorOf(subCalculation) ! c.copy(values = values, index = index)
        }
        val partialResults: Vector[T] = segments.map(_.head).to[Vector]
        log.debug(s"Waiting for ${partialResults.length} results (${partialResults.indices})")
        context.become(waitForResults(segments.length, partialResults, c, sender), discardOld = true)
      }
  }
  def waitForResults(outstandingResults : Int, partialResults : Vector[T], originalRequest : Calculate[T], originalSender : ActorRef) : Actor.Receive = {
    case c : Calculate[_] => sender ! Busy
    case r : CalculationResult[T] =>
      log.debug(s"Putting result ${r.result} on position ${r.index} in ${partialResults.length}")
      val updatedResults = partialResults.updated(r.index, r.result)
      log.debug("Killing sub-worker")
      sender ! PoisonPill
      if (outstandingResults==1) {
        log.debug("Calculating result from partial results")
        val result = updatedResults.reduceLeft(originalRequest.fn)
        originalSender ! CalculationResult(result, originalRequest.index)
        context.become(waitForCalculation, discardOld = true)
      }else{
        log.debug(s"Still waiting for ${outstandingResults-1} results")
        // For fanOut > 2 one could here already combine consecutive partial results
        context.become(waitForResults(outstandingResults-1, updatedResults, originalRequest, originalSender), discardOld = true)
      }
  }
}

Оптимизации

Использование параллельного вычисления префикса не является оптимальным. Актеры, вычисляющие произведение больших чисел, будут выполнять гораздо больше работы, чем актеры, вычисляющие произведение меньших чисел (например, при расчете 1 * ... * 100 , это быстрее рассчитать 1 * ... * 10 чем 90 * ... * 100). Поэтому было бы неплохо перемешать числа, чтобы большие числа смешивались с маленькими. Это работает в этом случае, потому что мы используем коммутативную операцию. Обычно для вычисления параллельного префикса требуется только ассоциативная операция.

Спектакль

Теоретически

Производительность актерского решения хуже, чем "наивное" решение (использующее параллельные коллекции) для небольших объемов данных. Актерское решение будет блестящим, когда вы будете выполнять сложные вычисления или распределять свои вычисления на специализированном оборудовании (например, видеокарте или ПЛИС) или на нескольких компьютерах. С помощью актера вы можете контролировать, кто выполняет какие вычисления, и вы даже можете перезапустить "висящие вычисления". Это может дать большую скорость.

На одной машине решение актера может помочь, если у вас неоднородная архитектура памяти. Затем вы могли бы организовать актеров таким образом, чтобы связать память с определенным процессором.

Некоторые измерения

Я провел реальное измерение производительности, используя рабочий лист Scala в IntelliJ IDEA.

Сначала я настроил систему актеров:

// Setup the actor system
val system = ActorSystem("root")
// Start one calculation actor
val calculationStart = Props(classOf[ParallelPrefixActor[BigInt]])


val calcolon = system.actorOf(calculationStart, "Calcolon-BigInt")

val inbox = Inbox.create(system)

Затем я определил вспомогательный метод для измерения времени:

// Helper function to measure time
def time[A] (id : String)(f: => A) = {
  val start = System.nanoTime()
  val result = f
  val stop = System.nanoTime()
  println(s"""Time for "${id}": ${(stop-start)*1e-6d}ms""")
  result
}

А потом я сделал некоторые измерения производительности:

// Test code
val limit = 10000
def testRange = (1 to limit).map(BigInt(_))

time("par product")(testRange.par.product)
val timeOut = FiniteDuration(240, TimeUnit.SECONDS)
inbox.send(calcolon, Calculate[BigInt]((1 to limit).map(BigInt(_)), 0, 10, _ * _))
time("actor product")(inbox.receive(timeOut))

time("par sum")(testRange.par.sum)
inbox.send(calcolon, Calculate[BigInt](testRange, 0, 5, _ + _))
time("actor sum")(inbox.receive(timeOut))

Я получил следующие результаты

> Время "номинального продукта": 134,38289мс
  res0: scala.math.BigInt = 284625968091705451890641321211986889014805140170279923
  079417999427441134000376444377299078675778477581588406214231752883004233994015
  351873905242116138271617481982419982759241828925978789812425312059465996259867
  065601615720360323979263287367170557419759620994797203461536981198970926112775
  004841988454104755446424421365733030767036288258035489674611170973695786036701
  910715127305872810411586405612811653853259684258259955846881464304255898366493
  170592517172042765974074461334000541940524623034368691540594040662278282483715
  120383221786446271838229238996389928272218797024593876938030946273322925705554
  596900278752822425443480211275590191694254290289169072190970836905398737474524
  833728995218023632827412170402680867692104515558405671725553720158521328290342
  799898184493136...

  Время для "актерского продукта": 1310.217247мс
  res2: Any = CalculationResult(28462596809170545189064132121198688901480514017027
  992307941799942744113400037644437729907867577847758158840621423175288300423399
  401535187390524211613827161748198241998275924182892597878981242531205946599625
  986706560161572036032397926328736717055741975962099479720346153698119897092611
  277500484198845410475544642442136573303076703628825803548967461117097369578603
  670191071512730587281041158640561281165385325968425825995584688146430425589836
  649317059251717204276597407446133400054194052462303436869154059404066227828248
  371512038322178644627183822923899638992827221879702459387693803094627332292570
  555459690027875282242544348021127559019169425429028916907219097083690539873747
  452483372899521802363282741217040268086769210451555840567172555372015852132829
  034279989818449...

> Время для "номинальной суммы": 6,488620999999999мс
  res3: scala.math.BigInt = 50005000

> Время для "актерской суммы": 657,752832мс
  res5: Any = CalculationResult(50005000,0)

Вы можете легко увидеть, что версия актера намного медленнее, чем при использовании параллельных коллекций.

Другие вопросы по тегам