Akka streams kafka фиксирует смещение после фильтра
Я пытаюсь использовать стратегию принятия по крайней мере один раз для смещения в потоках akka, и я не могу понять, какова ожидаемая схема для случаев, когда я использую фильтр в своем потоке.
я ожидаю, что ни одно из отфильтрованных сообщений не получит комментирования своего смещения, поэтому они окажутся в бесконечном цикле обработки.
Пример abasurd, иллюстрирующий это, фильтрует все сообщения следующим образом:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
Я вижу только решение обернуть мои фильтры в потоки, которые проверяют, будет ли логика отфильтровываться и фиксироваться в этом случае, но это кажется не элегантным и снижает ценность наличия форм фильтров.
Фильтрация не редкость, но я не вижу элегантного способа смещения? для меня кажется странным, что нет никакой возможности сделать это с помощью фреймворка, так чего мне не хватает?
1 ответ
Мне не удалось найти решение с текущей реализацией akka, чтобы иметь более интеллектуальную фиксацию индекса, поэтому я делегировал ответственность за установку автоматической фиксации kafka на уровне kafka, а также совместил это со стратегией изящного завершения работы приложения. поэтому, когда происходит сине-зеленое развертывание, все сообщения обрабатываются до закрытия приложения.
- Автокоммит на истину:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
- Изящное отключение:
val actorMaterializer = ActorMaterializer(
ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
actorMaterializer.system.terminate()
Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}