Стратегия контроля потока Akka Kafka не работает

Я запускаю приложение Akka Streams Kafka и хочу включить стратегию контроля для потребителя потока, чтобы в случае сбоя брокера и смерти потребителя потока после истечения времени ожидания супервизор мог перезапустить потребителя.

Вот мой полный код:

UserEventStream:

import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.stream.ActorMaterializer

class UserEventStream extends Actor {

  val settings = Settings(context.system).KafkaConsumers
  implicit val timeout: Timeout = Timeout(10 seconds)
  implicit val materializer = ActorMaterializer()

  override def preStart(): Unit = {
    super.preStart()
    println("Starting UserEventStream....s")
  }
  override def receive = {
    case "start" =>
      val consumerConfig = settings.KafkaConsumerInfo
      println(s"ConsumerConfig with $consumerConfig")
      startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1))
  }

  def startStreamConsumer(config: Map[String, String]) = {
    println(s"startStreamConsumer with config $config")

    val consumerSource = createConsumerSource(config)
    val consumerSink = createConsumerSink()
    val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor")

    println("START: The UserEventStream processing")
    val future =
      consumerSource
        .mapAsync(parallelism = 50) { message =>
          val m = s"${message.record.value()}"
          messageProcessor ? m
        }
        .runWith(consumerSink)
    future.onComplete {
      case Failure(ex) =>
        println("FAILURE : The UserEventStream processing, stopping the actor.")
        self ! PoisonPill
      case Success(ex) =>
    }
  }

  def createConsumerSource(config: Map[String, String]) = {
    val kafkaMBAddress = config("bootstrap-servers")
    val groupID = config("groupId")
    val topicSubscription = config("subscription-topic").split(',').toList
    println(s"Subscriptiontopics $topicSubscription")

    val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaMBAddress)
      .withGroupId(groupID)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*))
  }

  def createConsumerSink() = {
    Sink.foreach(println)
  }
}  

StreamProcessorSupervisor (это класс руководителя UserEventStream учебный класс):

import akka.actor.{Actor, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.ActorMaterializer
import stream.StreamProcessorSupervisor.StartClient
import scala.concurrent.duration._

object StreamProcessorSupervisor {
  final case object StartSimulator
  final case class StartClient(id: String)
  def props(implicit materializer: ActorMaterializer) =
    Props(classOf[StreamProcessorSupervisor], materializer)
}

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props(classOf[UserEventStream])
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          childProps,
          childName = "usereventstream",
          minBackoff = 1.second,
          maxBackoff = 1.minutes,
          randomFactor = 0.2
        )
      )
      context.actorOf(supervisor, name = s"$id-backoff-supervisor")
      val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream")
      userEventStrean ! "start"
  }
}

App (основной класс приложения):

import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer

object App extends App {

  implicit val system = ActorSystem("stream-test")
  implicit val materializer = ActorMaterializer()

  system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor")
}

application.conf:

kafka {

  consumer {

    num-consumers = "1"
    c1 {
      bootstrap-servers = "localhost:9092"
      bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1}
      groupId = "localakkagroup1"
      subscription-topic = "test"
      subscription-topic = ${?SUBSCRIPTION_TOPIC1}
      message-type = "UserEventMessage"
      poll-interval = 50ms
      poll-timeout = 50ms
      stop-timeout = 30s
      close-timeout = 20s
      commit-timeout = 15s
      wakeup-timeout = 10s
      max-wakeups = 10
      use-dispatcher = "akka.kafka.default-dispatcher"
      kafka-clients {
        enable.auto.commit = true
      }
    }
  }
}

После запуска приложения я намеренно убил брокера Kafka, а затем обнаружил, что через 30 секунд актер останавливает себя, отправляя таблетку с ядом. Но странно, что это не перезапускается, как указано в BackoffSupervisor стратегия.

В чем может быть проблема здесь?

1 ответ

Есть два случая UserEventStream в вашем коде: один дочерний актер, который BackoffSupervisor внутренне создает с Props что вы переходите к нему, а другой val userEventStrean это дитя StreamProcessorSupervisor, Вы отправляете "start" сообщение последнему, когда вы должны отправлять это сообщение первому.

Вам не нужно val userEventStrean, поскольку BackoffSupervisor создает дочернего актера. Сообщения, отправленные на BackoffSupervisor переадресованы ребенку, чтобы отправить "start" сообщение ребенку, отправьте его BackoffSupervisor:

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props[UserEventStream]
      val supervisorProps = BackoffSupervisor.props(...)
      val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
      supervisor ! "start"
  }
}

Другая проблема заключается в том, что, когда актер получает PoisonPill, это не то же самое, что актер, выбрасывающий исключение. Следовательно, Backoff.onFailure не будет срабатывать, когда UserEventStream отправляет себе PoisonPill, PoisonPill останавливает актера, так что используйте Backoff.onStop вместо:

val supervisorProps = BackoffSupervisor.props(
  Backoff.onStop( // <--- use onStop
    childProps,
    ...
  )
)
val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
supervisor ! "start"
Другие вопросы по тегам