Akka streams kafka фиксирует смещение после фильтра

Я пытаюсь использовать стратегию принятия по крайней мере один раз для смещения в потоках akka, и я не могу понять, какова ожидаемая схема для случаев, когда я использую фильтр в своем потоке.

я ожидаю, что ни одно из отфильтрованных сообщений не получит комментирования своего смещения, поэтому они окажутся в бесконечном цикле обработки.

Пример abasurd, иллюстрирующий это, фильтрует все сообщения следующим образом:

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl()) 
.runWith(Sink.ignore)

Я вижу только решение обернуть мои фильтры в потоки, которые проверяют, будет ли логика отфильтровываться и фиксироваться в этом случае, но это кажется не элегантным и снижает ценность наличия форм фильтров.

Фильтрация не редкость, но я не вижу элегантного способа смещения? для меня кажется странным, что нет никакой возможности сделать это с помощью фреймворка, так чего мне не хватает?

1 ответ

Мне не удалось найти решение с текущей реализацией akka, чтобы иметь более интеллектуальную фиксацию индекса, поэтому я делегировал ответственность за установку автоматической фиксации kafka на уровне kafka, а также совместил это со стратегией изящного завершения работы приложения. поэтому, когда происходит сине-зеленое развертывание, все сообщения обрабатываются до закрытия приложения.

  • Автокоммит на истину:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
  • Изящное отключение:
val actorMaterializer = ActorMaterializer(
  ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
        actorMaterializer.system.terminate()
        Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}
Другие вопросы по тегам