Reactive-Kafka Stream Consumer: найдены мертвые буквы
Я пытаюсь использовать сообщения от Кафки, используя реактивную библиотеку Акки. Я получаю одно сообщение, и после этого я получил
[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5] [akka://CommittableSourceConsumerMain/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://CommittableSourceConsumerMain/deadLetters] to Actor[akka://CommittableSourceConsumerMain/system/kafka-consumer-1#-1726905274] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Это код, который я выполняю
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import play.api.libs.json._
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
object CommittableSourceConsumerMain extends App {
implicit val system = ActorSystem("CommittableSourceConsumerMain")
implicit val materializer = ActorMaterializer()
val consumerSettings =ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer).withBootstrapServers("localhost:9092").withGroupId("CommittableSourceConsumer").withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val done =
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
val record=(msg.record.value())
val data=Json.parse(record)
val recordType=data \ "data" \"event" \"type"
val actualData=data \ "data" \ "row"
if(recordType.as[String]=="created"){
"Some saving logic"
}
else{
"Some logic"
}
msg.committableOffset.commitScaladsl()
}
.runWith(Sink.ignore)
}
1 ответ
Решение
Я наконец разобрался с решением. Из-за исключения времени выполнения в потоке Future
of error возвращается, что немедленно прекращает поток.
Akka-stream не предоставляет и не отображает исключение времени выполнения. Чтобы узнать исключение
done.onFailure{
case NonFatal(e)=>println(e)
}
Исключение было в блоке if-else. Также можно использовать Actor Strategy для возобновления потока в случае возникновения исключения.