Akka Streams, тайм-аут, если время между завершенной обработкой одного элемента до следующей обработки
Я хочу, чтобы мой поток не работал, если время между завершением обработки одного элемента и началом обработки следующего элемента превышает определенное количество. Похоже, что ни один из существующих методов тайм-аута не имеет отношения к этому случаю. Как бы я это сделал?
1 ответ
Это наиболее близкое решение, которое я принял (попробуйте здесь):
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import concurrent.duration._
import concurrent.Await
import concurrent.ExecutionContext.Implicits.global
import concurrent.Future
implicit class StreamTakeWithinTime[Out, Mat](src: Source[Out, Mat]) {
def takeWithinTime(maxIdleTime: FiniteDuration): Source[Out, Mat] =
src
.map(Option.apply)
.keepAlive(maxIdleTime, () => None)
.takeWhile {
case Some(_) => true
case None => false
}
.collect {
case Some(x) => x
}
}
implicit val actorSystem = ActorSystem("test")
implicit val actorMaterializer = ActorMaterializer()
var delay = 0
def tick = {
delay += 500
Thread.sleep(delay)
"tick"
}
val maxIdleTime = 2.seconds
val pipeline = Source
.fromIterator(() =>
new Iterator[String] {
override def hasNext: Boolean = true
override def next(): String = tick
})
.map { s =>
println("Long processing function...")
Thread.sleep(3000)
s
}
.takeWithinTime(maxIdleTime)
val res = Await.result(pipeline.runForeach(println), 30.seconds)
println("done")
который печатает:
Long processing function...
tick
Long processing function...
tick
Long processing function...
tick
Long processing function...
done