Получение последнего сообщения от kafka Тема с использованием akka-stream-kafka при соединении с websocket

Можно ли вообще получить последнее сообщение по теме Kafka, используя Akka Streams Kafka? Я создаю веб-сокет, который прослушивает тему Кафки, но в настоящее время он извлекает все предыдущие unred сообщения при подключении. Это может составить довольно много сообщений, так что меня интересует только последнее сообщение + любые будущие сообщения. (или только будущие сообщения)

Источник:

def source(): Flow[Any, String, NotUsed] = {
  val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
  Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}

Настройки потребителя:

  @Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
  val deserializer = new StringDeserializer()
  val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
    .getOrElse(Configuration.empty)

  ConsumerSettings(config.underlying, deserializer, deserializer)
    .withBootstrapServers(kafkaUrl)
    .withGroupId(GroupId)
}

Я попытался добавить настройку ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

Который должен "автоматически сбрасывать смещение на самое последнее смещение", но это, похоже, не имеет никакого эффекта.

1 ответ

Решение

Мне удалось избежать получения каких-либо исходящих данных при подключении к клиенту, используя метод, очень аккуратно описанный здесь Дэвидом ван Гестом.

Это сводится к наличию BroadcastHub на Потребителе:

val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()

И подключение статического потребителя, чтобы съесть все вышестоящие данные

liveSource.to(Sink.ignore).run()

Впоследствии это позволяет мне подписаться на клиент WebSocket на все данные, полученные потребителем как таковые:

def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}

Или фильтр на основе KafkaTopic (или что-то еще, что вы хотите)

def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
  Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
    x =>
      (Json.parse(x) \ "topic").asOpt[String] match {
        case Some(str) => str.equals(kafkaTopic)
        case None => false
      }
  }))
}

Это не решает проблему предоставления x объема данных пользователю при первом подключении, но я предвижу, что мы добавим простой запрос к базе данных для любых исторических данных и позволим подключению WebSocket сосредоточиться только на данных прямого вещания.

Другие вопросы по тегам