Параллельный запуск этапов Akka Streams значительно увеличивает нагрузку на память
Я пытаюсь реализовать поток Akka, который читает кадры из видеофайла и применяет классификатор SVM для обнаружения объектов в каждом кадре. Обнаружение может выполняться параллельно, поскольку порядок видеокадров не имеет значения. Моя идея состоит в том, чтобы создать график, который следует за Поваренной книгой Akka Streams ( Балансирование заданий фиксированному пулу работников), имеющей две стадии обнаружения, помеченные как .async
,
Он работает в определенной степени, как и ожидалось, но я заметил, что нагрузка на память моей системы (доступно только 8 ГБ) резко возрастает и значительно замедляет работу системы. Сравнивая это с другим подходом, который использует .mapAsync
( Akka Docs), объединяя даже трех действующих лиц в поток, выполняющий обнаружение объекта, нагрузка на память значительно ниже.
Что мне не хватает? Почему параллельный запуск двух этапов увеличивает нагрузку на память, а три параллельных актера работают нормально?
Дополнительные замечания: я использую OpenCV для чтения видеофайла. Из-за разрешения 4K, каждый видеокадр типа Mat
составляет около 26,5 МБ.
Запуск двух этапов параллельно с .async
резко увеличивая нагрузку на память
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(actorSystem)
.withInputBuffer(initialSize = 1, maxSize = 1)
.withOutputBurstLimit(1)
.withSyncProcessingLimit(2)
)
val greyscaleConversion: Flow[Frame, Frame, NotUsed] =
Flow[Frame].map { el => Frame(el.videoPos, FrameTransformation.transformToGreyscale(el.frame)) }
val objectDetection: Flow[Frame, DetectedObjectPos, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val numberOfDetectors = 2
val frameBalance: UniformFanOutShape[Frame, Frame] = builder.add(Balance[Frame](numberOfDetectors, waitForAllDownstreams = true))
val detectionMerge: UniformFanInShape[DetectedObjectPos, DetectedObjectPos] = builder.add(Merge[DetectedObjectPos](numberOfDetectors))
for (i <- 0 until numberOfDetectors) {
val detectionFlow: Flow[Frame, DetectedObjectPos, NotUsed] = Flow[Frame].map { greyFrame =>
val classifier = new CascadeClassifier()
classifier.load("classifier.xml")
val detectedObjects: MatOfRect = new MatOfRect()
classifier.detectMultiScale(greyFrame.frame, detectedObjects, 1.08, 5, 0 | Objdetect.CASCADE_SCALE_IMAGE, new Size(40, 20), new Size(100, 80))
DetectedObjectPos(greyFrame.videoPos, detectedObjects)
}
frameBalance.out(i) ~> detectionFlow.async ~> detectionMerge.in(i)
}
FlowShape(frameBalance.in, detectionMerge.out)
})
def createGraph(videoFile: Video): RunnableGraph[NotUsed] = {
Source.fromGraph(new VideoSource(videoFile))
.via(greyscaleConversion).async
.via(objectDetection)
.to(Sink.foreach(detectionDisplayActor !))
}
Интеграция актеров с .mapAsync
не увеличивая нагрузку на память
val greyscaleConversion: Flow[Frame, Frame, NotUsed] =
Flow[Frame].map { el => Frame(el.videoPos, FrameTransformation.transformToGreyscale(el.frame)) }
val detectionRouter: ActorRef =
actorSystem.actorOf(RandomPool(numberOfDetectors).props(Props[DetectionActor]), "detectionRouter")
val detectionFlow: Flow[Frame, DetectedObjectPos, NotUsed] =
Flow[Frame].mapAsyncUnordered(parallelism = 3)(el => (detectionRouter ? el).mapTo[DetectedObjectPos])
def createGraph(videoFile: Video): RunnableGraph[NotUsed] = {
Source.fromGraph(new VideoSource(videoFile))
.via(greyscaleConversion)
.via(detectionFlow)
.to(Sink.foreach(detectionDisplayActor !))
}