Akka Streams, тайм-аут, если время между завершенной обработкой одного элемента до следующей обработки

Я хочу, чтобы мой поток не работал, если время между завершением обработки одного элемента и началом обработки следующего элемента превышает определенное количество. Похоже, что ни один из существующих методов тайм-аута не имеет отношения к этому случаю. Как бы я это сделал?

1 ответ

Это наиболее близкое решение, которое я принял (попробуйте здесь):

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import concurrent.duration._
import concurrent.Await
import concurrent.ExecutionContext.Implicits.global
import concurrent.Future

implicit class StreamTakeWithinTime[Out, Mat](src: Source[Out, Mat]) {
  def takeWithinTime(maxIdleTime: FiniteDuration): Source[Out, Mat] =
    src
      .map(Option.apply)
      .keepAlive(maxIdleTime, () => None)
      .takeWhile {
        case Some(_) => true
        case None    => false
      }
      .collect {
        case Some(x) => x
      }
}

implicit val actorSystem = ActorSystem("test")
implicit val actorMaterializer = ActorMaterializer()

var delay = 0
def tick = {
  delay += 500
  Thread.sleep(delay)
  "tick"
}

val maxIdleTime = 2.seconds

val pipeline = Source
  .fromIterator(() =>
    new Iterator[String] {
      override def hasNext: Boolean = true
      override def next(): String = tick
  })
  .map { s =>
    println("Long processing function...")
    Thread.sleep(3000)
    s
  }
  .takeWithinTime(maxIdleTime)

val res = Await.result(pipeline.runForeach(println), 30.seconds)
println("done")

который печатает:

Long processing function...
tick
Long processing function...
tick
Long processing function...
tick
Long processing function...
done
Другие вопросы по тегам