Наблюдаемая группа Monix по большому количеству ключей без утечек памяти
Я пытаюсь выполнить расщепление сингла Observable
в Monix по ключу, затем группа до последнего n
события в каждом GrouppedObservable
и отправить их для дальнейшей обработки. Проблема в том, что количество ключей для группировки, возможно, бесконечно, и это вызывает утечки памяти.
Контекст приложения:
У меня есть поток кафки с сообщениями из многих разговоров. Каждый разговор имеет roomId
и я хочу сгруппировать этот идентификатор, чтобы получить коллекцию Observables, каждая из которых содержит только сообщения из одного разговора. Комнаты для разговоров, как правило, недолговечны, т.е. создаются новые разговоры с уникальными roomId
за короткий промежуток времени происходит обмен несколькими десятками сообщений, затем разговор закрывается. Чтобы избежать утечек памяти, я хочу сохранить буферы только 100-1000 самых последних разговоров и удалить старые. Таким образом, если событие происходит из долгого невидимого разговора, оно будет рассматриваться как новый разговор, потому что буфер с его предыдущими сообщениями будет забыт.
У метода groupBy в Monix есть аргумент keysBuffer
это определяет, как обращаться с ключевыми буферами.
Я думал что указав keyBuffer
Стратегия DropOld позволит мне добиться желаемого поведения.
Ниже приведен упрощенный вариант описанного варианта использования.
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
import scala.concurrent.duration._
import scala.util.Random
case class Event(key: Key, value: String, seqNr: Int) {
override def toString: String = s"(k:$key;s:$seqNr)"
}
case class Key(conversationId: Int, messageNr: Int)
object Main {
def main(args: Array[String]): Unit = {
val fakeConsumer = Consumer.foreach(println)
val kafkaSimulator = Observable.interval(1.millisecond)
.map(n => generateHeavyEvent(n.toInt))
val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
.mergeMap(slidingWindow)
groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
}
def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
source.scan(List.empty[T])(fixedSizeList)
def fixedSizeList[T](list: List[T], elem: T): List[T] =
(list :+ elem).takeRight(5)
def generateHeavyEvent(n: Int): Event = {
val conversationId: Int = n / 500
val messageNr: Int = n % 5
val key = Key(conversationId, messageNr)
val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
Event(key, value, n)
}
}
Однако наблюдение за кучей приложений в VisualVM указывает на утечку памяти. Примерно через 30 минут бега я получил java.lang.OutOfMemoryError: GC overhead limit exceeded
Ниже приведен скриншот графиков использования кучи с изображением запуска моего приложения в течение 30 минут. (Уплощенная часть в конце после OutOfMemoryError
)
VisualVM Куча сюжет приложения
Мой вопрос: как я могу сгруппировать события в monix, возможно, бесконечное количество ключей без утечки памяти? Старые ключи могут быть сброшены
Справочная информация:
- моникс версия:
3.0.0-RC2
- версия для Scala:
2.12.8
0 ответов
У меня есть аналогичный вариант использования, как у вас, чтение потока kafka и группировка по идентификатору.
Что вы хотите сделать, так это тайм-аут / очистить GrouppedObservable
когда нет спроса. В противном случае он останется в памяти навсегда. Итак, вы можете сделать что-то вроде этого:
val eventsStream: Observable[Int] = ???
eventsStream
.groupBy(_ % 2 == 0)
.mergeMap {
_.mapEval(s => Task.delay(println(s)))
.timeoutOnSlowUpstreamTo(5.minutes, Observable.empty)
}
.completedL