Почему отображение класса A на класс B с помощью monix или akka-streams происходит так медленно?

Я сравнил отображение List[ClassA] с List[ClassB] с помощью потоков monix и akka, но я не понимаю, почему это так медленно.

Я пробовал другой способ сопоставления, и вот результат с JMH:

[info] Benchmark                                    Mode  Cnt    Score    Error  Units
[info] MappingBenchmark.akkaLoadBalanceMap            ss   20  742,626 â–’  4,853  ms/op
[info] MappingBenchmark.akkaMapAsyncFold              ss   20  480,460 â–’  8,493  ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  331,398 â–’ 10,490  ms/op
[info] MappingBenchmark.akkaMapFold                   ss   20  713,500 â–’  7,394  ms/op
[info] MappingBenchmark.akkaMapFoldAsync              ss   20  313,275 â–’  8,716  ms/op
[info] MappingBenchmark.map                           ss   20    0,567 â–’  0,175  ms/op
[info] MappingBenchmark.monixBatchedObservables       ss   20  259,736 â–’  5,939  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  456,310 â–’  5,225  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  795,345 â–’  5,443  ms/op
[info] MappingBenchmark.monixMapFoldLeft              ss   20  247,172 â–’  5,342  ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync         ss   20  478,840 â–’ 25,249  ms/op
[info] MappingBenchmark.monixTaskGather               ss   20    6,707 â–’  2,176  ms/op
[info] MappingBenchmark.parMap                        ss   20    1,257 â–’  0,831  ms/op

Вот код:

package benches

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
  import monix.eval._
  import monix.reactive._
  import monix.execution.Scheduler.Implicits.global

  def list: List[ClassA] = (1 to 10000).map(ClassA).toList
  //    val l = (1 to 135368).map(Offre).toList

  // ##### SCALA ##### //

  @Benchmark
  def map: List[ClassB] = list.map(o => ClassB(o, o))

  @Benchmark
  def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList

  // ##### MONIX ##### //

  @Benchmark
  def monixTaskGather: List[ClassB] = {
    val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixBatchedObservables: List[ClassB] = {
    val task: Task[List[ClassB]] =
      Observable.fromIterable(list)
        .bufferIntrospective(256)
        .flatMap{items =>
          val tasks = items.map(o => Task(ClassB(o,o)))
          val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
          val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
          Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        }.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  // ##### AKKA-STREAM ##### //

  @Benchmark
  def akkaMapFold: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapFoldAsync: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFold: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFoldAsync: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMap: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = {
      val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
      RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
    implicit val actorSystem = ActorSystem("app")
    implicit val actorMaterializer = ActorMaterializer()
    val eventualBs = g.run()
    val res = Await.result(eventualBs, Duration.Inf)
    actorSystem.terminate()
    res
  }
}

case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)

Результат на скамейке становится еще хуже, когда начальная коллекция больше.

Я хотел бы знать, в чем моя ошибка.

Спасибо, что поделились своими знаниями!

С наилучшими пожеланиями

2 ответа

Решение

Я обновил код и стенд действительно лучше, чем раньше. Разница связана с оператором списка. Фактически, первая версия использовала append вместо preprend. Поскольку List является связанным списком, он должен был перебирать элементы, чтобы добавить новый. Будучи ленивым, я хотел использовать оператор _, но не должен был.

package benches

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.collection.immutable.Seq

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
  import monix.eval._
  import monix.reactive._
  import monix.execution.Scheduler.Implicits.global

  def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList
  //    val l = (1 to 135368).map(Offre).toList

  // ##### SCALA ##### //

  def foldClassB = (l:List[ClassB], o:ClassB) => o +: l

  @Benchmark
  def map: Seq[ClassB] = list.map(o => ClassB(o, o))

  @Benchmark
  def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList

  // ##### MONIX ##### //

  @Benchmark
  def monixTaskGather: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixBatchedObservables: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] =
      Observable.fromIterable(list)
        .bufferIntrospective(256)
        .flatMap{items =>
          val tasks = items.map(o => Task(ClassB(o,o)))
          val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
          val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
          Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        }.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeft: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeftAsync: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeft: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeftAsync: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
    Await.result(task.runAsync, Duration.Inf)
  }

  // ##### AKKA-STREAM ##### //

  @Benchmark
  def akkaMapFold: Seq[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapFoldAsync: Seq[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapSeq: Seq[ClassB] = {
    val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFold: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFoldAsync: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncSeq: Seq[ClassB] = {
    val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMap: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = {
      val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB)
      RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMapSeq: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = {
      val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq
      RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = {
    implicit val actorSystem = ActorSystem("app")
    implicit val actorMaterializer = ActorMaterializer()
    val eventualBs = g.run()
    val res = Await.result(eventualBs, Duration.Inf)
    actorSystem.terminate()
    res
  }
}

case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)

Результат с этим обновленным классом:

[info] Benchmark                                    Mode  Cnt   Score   Error  Units
[info] MappingBenchmark.akkaLoadBalanceMap            ss   20  19,052 â–’ 3,779  ms/op
[info] MappingBenchmark.akkaLoadBalanceMapSeq         ss   20  16,115 â–’ 3,232  ms/op
[info] MappingBenchmark.akkaMapAsyncFold              ss   20  20,862 â–’ 3,127  ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  26,994 â–’ 4,010  ms/op
[info] MappingBenchmark.akkaMapAsyncSeq               ss   20  19,399 â–’ 7,089  ms/op
[info] MappingBenchmark.akkaMapFold                   ss   20  12,132 â–’ 4,111  ms/op
[info] MappingBenchmark.akkaMapFoldAsync              ss   20  22,652 â–’ 3,802  ms/op
[info] MappingBenchmark.akkaMapSeq                    ss   20  10,894 â–’ 3,114  ms/op
[info] MappingBenchmark.map                           ss   20   0,625 â–’ 0,193  ms/op
[info] MappingBenchmark.monixBatchedObservables       ss   20   9,175 â–’ 4,080  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  11,724 â–’ 4,458  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  14,174 â–’ 6,962  ms/op
[info] MappingBenchmark.monixMapFoldLeft              ss   20   1,057 â–’ 0,960  ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync         ss   20   9,638 â–’ 4,910  ms/op
[info] MappingBenchmark.monixTaskGather               ss   20   7,065 â–’ 2,428  ms/op
[info] MappingBenchmark.parMap                        ss   20   1,392 â–’ 0,923  ms/op

кажется, что все еще быстрее сопоставить со Scala, если мы можем, прежде чем запускать поток.

Просто заметка об асинхронной обработке / параллелизме... в общем, при параллельной обработке данных у вас возникает много накладных расходов, связанных с ЦП, для синхронизации результатов.

Затраты могут быть настолько значительными, что могут свести на нет выигрыш во времени, который вы получаете от нескольких процессорных ядер, работающих параллельно.

Вам также следует ознакомиться с Законом Амдала. Посмотрите на эти цифры: при параллельной части в 75% вы достигнете максимально возможного ускорения только с 4 процессорами. И с параллельной частью в 50% вы достигнете максимального ускорения только с 2 процессорами.

И это только теоретический предел, потому что у вас также есть синхронизация с общей памятью между процессорами, что может привести к серьезным проблемам; в основном процессоры оптимизированы для последовательного выполнения. Внесите проблемы параллелизма, и вам нужно форсировать упорядочение с помощью барьеров памяти, которые сводят на нет многие оптимизации процессора. И, таким образом, вы можете достичь отрицательного ускорения, как видно из ваших тестов.

Таким образом, вы тестируете асинхронное / параллельное сопоставление, но тест в основном ничего не делает, может с тем же успехом протестировать с помощью функции идентификации, и это будет почти то же самое. Другими словами, тест, который вы проводите, и его результаты практически бесполезны на практике.

И как примечание, это также, почему мне никогда не нравилась идея "параллельных коллекций". Концепция ошибочна, потому что вы можете использовать только параллельные коллекции для чисто связанных с процессором вещей (т.е. без ввода-вывода, без реальной асинхронной работы), что позволяет сказать, что это хорошо для выполнения некоторых вычислений, за исключением того, что:

  1. для многих целей использование параллельных коллекций медленнее, чем обычные операторы, которые используют один процессор и
  2. если у вас на самом деле есть работа с процессором и вам нужно максимально использовать свои аппаратные ресурсы, то "параллельные коллекции" в их текущем воплощении на самом деле являются неправильной абстракцией, потому что "оборудование" в наши дни включает в себя графические процессоры

Другими словами, параллельные коллекции не используют аппаратные ресурсы эффективно, так как они полностью игнорируют поддержку GPU и совершенно не подходят для смешанных задач ввода-вывода CPU, так как им не хватает поддержки асинхронности.

Я чувствую необходимость упомянуть об этом, потому что слишком часто люди думают, что растирание "параллельной" пиксельной пыли на их коде заставит его работать быстрее, но во многих случаях - нет.

Параллелизм прекрасно работает, когда у вас есть задачи, связанные с вводом / выводом (разумеется, смешанные с задачами, связанными с ЦП), и в этом случае нагрузка на ЦП намного менее значительна, потому что время обработки будет зависеть от ввода / вывода.

PS: простое отображение коллекций Scala должно быть быстрее, потому что оно строгое и (в зависимости от типа коллекции) использует буферы на основе массива и, таким образом, не очищает кэш процессора. Monix-х .map имеет такие же накладные расходы, как у Скалы Iterable.mapили, другими словами, почти нулевые накладные расходы, но их применение лениво и вводит некоторые накладные расходы на бокс, от которых мы не можем избавиться, потому что JVM не специализирует дженерики.

Это чертовски быстро на практике;-)

Другие вопросы по тегам