Akka Streams Reactive Kafka - OutOfMemoryError под большой нагрузкой

Я запускаю приложение Akka Streams Reactive Kafka, которое должно работать при большой нагрузке. После запуска приложения в течение примерно 10 минут, приложение закрывается с OutOfMemoryError, Я попытался отладить дамп кучи и обнаружил, что akka.dispatch.Dispatcher занимает ~5 ГБ памяти. Ниже приведены мои конфигурационные файлы.

Акка версия: 2.4.18

Реактивная версия Kafka: 2.4.18

1.application.conf:

consumer {
num-consumers = "2"
c1 {
  bootstrap-servers = "localhost:9092"
  bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
  groupId = "testakkagroup1"
  subscription-topic = "test"
  subscription-topic=${?SUBSCRIPTION_TOPIC1}
  message-type = "UserEventMessage"
  poll-interval = 100ms
  poll-timeout = 50ms
  stop-timeout = 30s
  close-timeout = 20s
  commit-timeout = 15s
  wakeup-timeout = 10s
  use-dispatcher = "akka.kafka.default-dispatcher"
  kafka-clients {
    enable.auto.commit = true
  }
}  

2.build.sbt:

java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \
-Dzookeeper.port=$ZK_PORT \
-jar ./target/scala-2.11/test-assembly-1.0.jar   

3.Source а также Sink актеры:

class EventStream extends Actor with ActorLogging {

  implicit val actorSystem = context.system
  implicit val timeout: Timeout = Timeout(10 seconds)
  implicit val materializer = ActorMaterializer()
  val settings = Settings(actorSystem).KafkaConsumers

  override def receive: Receive = {
    case StartUserEvent(id) =>
      startStreamConsumer(consumerConfig("EventMessage"+".c"+id))
  }

  def startStreamConsumer(config: Map[String, String]) = {
    val consumerSource = createConsumerSource(config)

    val consumerSink = createConsumerSink()

    val messageProcessor = startMessageProcessor(actorA, actorB, actorC)

    log.info("Starting The UserEventStream processing")

    val future = consumerSource.map { message =>
      val m = s"${message.record.value()}"
      messageProcessor ? m
    }.runWith(consumerSink)

    future.onComplete {
      case _ => actorSystem.stop(messageProcessor)
    }
  }

  def startMessageProcessor(actorA: ActorRef, actorB: ActorRef, actorC: ActorRef) = {
    actorSystem.actorOf(Props(classOf[MessageProcessor], actorA, actorB, actorC))  
  }

  def createConsumerSource(config: Map[String, String]) = {
    val kafkaMBAddress = config("bootstrap-servers")
    val groupID = config("groupId")
    val topicSubscription = config("subscription-topic").split(',').toList
    println(s"Subscriptiontopics $topicSubscription")

    val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaMBAddress)
      .withGroupId(groupID)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription:_*))
  }

  def createConsumerSink() = {
    Sink.foreach(println)
  }
}    

В этом случае actorA, actorB, а также actorC занимаемся обработкой бизнес-логики и взаимодействием с базой данных. Есть ли что-то, чего мне не хватает при работе с потребителями Akka Reactive Kafka, такие как фиксация, ошибка или настройка регулирования? Поскольку, глядя в дамп кучи, я мог догадаться, что сообщения накапливаются.

1 ответ

Решение

Я бы хотел изменить одну вещь:

val future = consumerSource.map { message =>
  val m = s"${message.record.value()}"
  messageProcessor ? m
}.runWith(consumerSink)

В приведенном выше коде вы используете ask отправлять сообщения messageProcessor Актер и ожидайте ответов, но для того, чтобы ask чтобы функционировать как механизм противодавления, вы должны использовать его с mapAsync (больше информации в документации). Что-то вроде следующего:

val future =
  consumerSource
    .mapAsync(parallelism = 5) { message =>
      val m = s"${message.record.value()}"
      messageProcessor ? m
    }
    .runWith(consumerSink)

Отрегулируйте уровень параллелизма по мере необходимости.

Другие вопросы по тегам