Наблюдаемая группа Monix по большому количеству ключей без утечек памяти

Я пытаюсь выполнить расщепление сингла Observable в Monix по ключу, затем группа до последнего n события в каждом GrouppedObservable и отправить их для дальнейшей обработки. Проблема в том, что количество ключей для группировки, возможно, бесконечно, и это вызывает утечки памяти.

Контекст приложения:

У меня есть поток кафки с сообщениями из многих разговоров. Каждый разговор имеет roomId и я хочу сгруппировать этот идентификатор, чтобы получить коллекцию Observables, каждая из которых содержит только сообщения из одного разговора. Комнаты для разговоров, как правило, недолговечны, т.е. создаются новые разговоры с уникальными roomId за короткий промежуток времени происходит обмен несколькими десятками сообщений, затем разговор закрывается. Чтобы избежать утечек памяти, я хочу сохранить буферы только 100-1000 самых последних разговоров и удалить старые. Таким образом, если событие происходит из долгого невидимого разговора, оно будет рассматриваться как новый разговор, потому что буфер с его предыдущими сообщениями будет забыт.

У метода groupBy в Monix есть аргумент keysBuffer это определяет, как обращаться с ключевыми буферами.

Я думал что указав keyBuffer Стратегия DropOld позволит мне добиться желаемого поведения.

Ниже приведен упрощенный вариант описанного варианта использования.

import monix.execution.Scheduler.Implicits.global
import monix.reactive._

import scala.concurrent.duration._
import scala.util.Random

case class Event(key: Key, value: String, seqNr: Int) {
  override def toString: String = s"(k:$key;s:$seqNr)"
}

case class Key(conversationId: Int, messageNr: Int)

object Main {
  def main(args: Array[String]): Unit = {

    val fakeConsumer = Consumer.foreach(println)
    val kafkaSimulator = Observable.interval(1.millisecond)
      .map(n => generateHeavyEvent(n.toInt))

    val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
      .mergeMap(slidingWindow)

    groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
  }

  def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
    source.scan(List.empty[T])(fixedSizeList)

  def fixedSizeList[T](list: List[T], elem: T): List[T] =
    (list :+ elem).takeRight(5)

  def generateHeavyEvent(n: Int): Event = {
    val conversationId: Int = n / 500
    val messageNr: Int = n % 5
    val key = Key(conversationId, messageNr)
    val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
    Event(key, value, n)
  }

}

Однако наблюдение за кучей приложений в VisualVM указывает на утечку памяти. Примерно через 30 минут бега я получил java.lang.OutOfMemoryError: GC overhead limit exceeded

Ниже приведен скриншот графиков использования кучи с изображением запуска моего приложения в течение 30 минут. (Уплощенная часть в конце после OutOfMemoryError)

VisualVM Куча сюжет приложения

Мой вопрос: как я могу сгруппировать события в monix, возможно, бесконечное количество ключей без утечки памяти? Старые ключи могут быть сброшены

Справочная информация:

  • моникс версия: 3.0.0-RC2
  • версия для Scala: 2.12.8

0 ответов

У меня есть аналогичный вариант использования, как у вас, чтение потока kafka и группировка по идентификатору.

Что вы хотите сделать, так это тайм-аут / очистить GrouppedObservableкогда нет спроса. В противном случае он останется в памяти навсегда. Итак, вы можете сделать что-то вроде этого:

val eventsStream: Observable[Int] = ???

eventsStream
  .groupBy(_ % 2 == 0)
  .mergeMap {
    _.mapEval(s => Task.delay(println(s)))
     .timeoutOnSlowUpstreamTo(5.minutes, Observable.empty)
  }
  .completedL

Другие вопросы по тегам