Akka Stream Kafka, полный поток, когда достигнут конец журнала

Я использую Akka Streams Kafka и ищу способ сделать следующее:

  • Начало потока со смещения x
  • Потребляйте последовательно предметы x, x+1, x+2.. до последнего пункта
  • Как только последний элемент был израсходован, завершите поток.

Код будет выглядеть примерно так

Consumer
  .plainSource(consumerSettings, subscription)
  .runForeach(println("got record!"))
  .onComplete {
    case Success(_) => // all items read
    case Failure(error) => // error
  }

и завершится после прочтения последнего элемента. Возможно, это не тот способ, которым эта библиотека предназначена для использования. Как мне этого добиться?

1 ответ

Решение

Akka Consumer работает по принципу "тянуть", он будет жить вечно, если не возникнут ошибки, связанные с брокером. Но когда вы считаете, что поток закончился? Кафку можно рассматривать как распределенный журнал, откуда вы читаете сообщения с заданным смещением. Пока ваш клиент подключен к брокеру, ваш клиент будет работать и работать... Если вы считаете, что завершение вашего потока, когда от Кафки не происходит никаких событий в течение определенного промежутка времени (например), вы можете использовать idleTimeout:

  Consumer
    .plainSource(consumerSettings, subscription)
    .idleTimeout(10 seconds)
    .runForeach(e => println("E"))
    .onComplete {
      case Success(_) => // all items read
      case Failure(error) =>
      // TimeoutException if no element in ten seconds the stream stops throwing this exception
    }

Другой возможностью может быть использование этапа Fan-In, в частности MergePreferred. Мы можем создать еще один источник тиков, который генерирует события за определенный промежуток времени. Источник Кафки будет иметь предпочтение, так как, поскольку элементы приходят из Кафки, сцена всегда будет извлекать элементы из этого источника. Если в каком-то интервале нет элементов, строка "Timeout" будет перемещена вниз по течению. Что-то вроде:

  implicit val actorSystem = ActorSystem("test-actor-system")
  implicit val streamMaterializer = ActorMaterializer()
  implicit val ec = actorSystem.dispatcher

  val consumer =
  Consumer
    .plainSource(consumerSettings, subscription)
    .map(_.value())

  val tick = Source.tick(50 millis, 30 seconds, "Timeout")

  val source = GraphDSL.create(consumer, tick)(Keep.both) { implicit b ⇒
    (r1, r2) ⇒
      val merge = b.add(MergePreferred[String](1, false))
      r2 ~> merge.in(0)
      r1 ~> merge.preferred
      SourceShape(merge.out)
  }

  Source
    .fromGraph(source)
    .takeWhile(el => el != "Timeout")
    .runForeach(msg => println(msg))
  .onComplete{
    case Success(_) => println("Stream ended")
    case Failure(error) => println("There was an error")
  }

С takeWhile поток будет активным, пока есть элементы из Кафки.

Это только один подход. У Akka Stream есть много разных этапов и Graph Api, чтобы справиться с этими ситуациями, возможно, более элегантно.

Другие вопросы по тегам