Scala Futures - встроенный тайм-аут?
Есть аспект будущего, который я не совсем понимаю из официального руководства. http://docs.scala-lang.org/overviews/core/futures.html
Есть ли у фьючерса в Scala встроенный механизм тайм-аута? Допустим, приведенный ниже пример представлял собой текстовый файл объемом 5 гигабайт... в конечном итоге подразумеваемая область действия "Implicits.global" приводит к тому, что onFailure запускается неблокирующим образом, или это можно определить? И без какого-либо тайм-аута по умолчанию, разве это не значит, что ни успех, ни неудача никогда не сработают?
import scala.concurrent._
import ExecutionContext.Implicits.global
val firstOccurence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
case t => println("Could not process file: " + t.getMessage)
}
10 ответов
Вы получаете поведение тайм-аута только тогда, когда вы используете блокировку, чтобы получить результаты Future
, Если вы хотите использовать неблокирующие обратные вызовы onComplete
, onSuccess
или же onFailure
, тогда вам придется свернуть свой собственный тайм-аут обработки. Акка имеет встроенную обработку тайм-аута для запроса / ответа (?
) обмен сообщениями между актерами, но не уверен, если вы хотите начать использовать Akka. FWIW, в Akka, для обработки тайм-аута, они составляют два Futures
вместе через Future.firstCompletedOf
тот, который представляет фактическую асинхронную задачу, и тот, который представляет тайм-аут. Если тайм-аут (через HashedWheelTimer
) появляется сначала, вы получаете ошибку при асинхронном обратном вызове.
Очень упрощенный пример скручивания собственного может пойти примерно так. Во-первых, объект для планирования тайм-аутов:
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException
object TimeoutScheduler{
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise:Promise[_], after:Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout:Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}
Затем функция, которая берет Future и добавляет к нему поведение времени ожидания:
import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration
def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
fut onComplete{case result => timeout.cancel()}
combinedFut
}
Обратите внимание, что HashedWheelTimer
Я пользуюсь вот от Нетти.
Все эти ответы требуют дополнительных зависимостей. Я решил написать версию, используя java.util.Timer, который является эффективным способом запуска функции в будущем, в данном случае для запуска тайм-аута.
Сообщение в блоге с более подробной информацией здесь
Используя это с обещанием Scala, мы можем создать будущее с таймаутом следующим образом:
package justinhj.concurrency
import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps
object FutureUtil {
// All Future's that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter ensures that timeout timers are daemon threads and do not stop
// the program from shutting down
val timer: Timer = new Timer(true)
/**
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future's outcome
* @return Future[T]
*/
def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {
// Promise will be fulfilled with either the callers Future or the timer task if it times out
val p = Promise[T]
// and a Timer task to handle timing out
val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException())
}
}
// Set the timeout to check in the future
timer.schedule(timerTask, timeout.toMillis)
future.map {
a =>
if(p.trySuccess(a)) {
timerTask.cancel()
}
}
.recover {
case e: Exception =>
if(p.tryFailure(e)) {
timerTask.cancel()
}
}
p.future
}
}
Я только что создал TimeoutFuture
класс для сотрудника:
TimeoutFuture
package model
import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._
object TimeoutFuture {
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
val prom = promise[A]
// timeout logic
Akka.system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}
// business logic
Future {
prom success block
}
prom.future
}
}
использование
val future = TimeoutFuture(10 seconds) {
// do stuff here
}
future onComplete {
case Success(stuff) => // use "stuff"
case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}
Заметки:
- Предполагает играть! рамки (но это достаточно легко адаптировать)
- Каждый кусок кода работает в одном и том же
ExecutionContext
что не может быть идеальным.
Play Framework содержит Promise.timeout, так что вы можете написать код, как показано ниже
private def get(): Future[Option[Boolean]] = {
val timeoutFuture = Promise.timeout(None, Duration("1s"))
val mayBeHaveData = Future{
// do something
Some(true)
}
// if timeout occurred then None will be result of method
Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}
Я весьма удивлен, что это не стандартно в Scala. Мои версии короткие и не имеют зависимостей
import scala.concurrent.Future
sealed class TimeoutException extends RuntimeException
object FutureTimeout {
import scala.concurrent.ExecutionContext.Implicits.global
implicit class FutureTimeoutLike[T](f: Future[T]) {
def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
Thread.sleep(ms)
throw new TimeoutException
}))
lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
}
}
Пример использования
import FutureTimeout._
Future { /* do smth */ } withTimeout
Если вы хотите, чтобы автором (держателем обещаний) был тот, кто контролирует логику тайм-аута, используйте akka.pattern.after следующим образом:
val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))
Таким образом, если ваша логика завершения обещания никогда не выполняется, будущее вашего абонента все равно будет завершено в какой-то момент с ошибкой.
Вы можете указать время ожидания при ожидании на будущее:
За scala.concurrent.Future
, result
Метод позволяет указать время ожидания.
За scala.actors.Future
, Futures.awaitAll
позволяет указать время ожидания.
Я не думаю, что есть тайм-аут, встроенный в исполнение Future.
Никто не упоминается akka-streams
, еще. Потоки имеют легкий completionTimeout
метод, и применение этого к потоку с одним источником работает как будущее.
Но akka-streams также выполняет отмену, поэтому он может фактически прекратить работу источника, т.е. он сигнализирует об истечении времени ожидания источника.
Эта версия работает без использования тайм-аута
import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
object TimeoutFuture {
def apply[A](
timeout: FiniteDuration
)(block: => A)(implicit executor: ExecutionContext): Future[A] =
try {
Future { Await.result(Future { block }, timeout) }
} catch {
case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
}
}
Monix Task
имеет поддержку тайм-аута
import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException
val source = Task("Hello!").delayExecution(10.seconds)
// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)
timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)
Вы можете дождаться завершения будущего, используя Await
.
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
val meaningOfLife: Int = Await.result(Future(42), 1.nano)
println (meaningOfLife)
Приведенные выше отпечатки 42
Вам может понадобиться неявный ExecutionContext
доступно, и в этом случае просто добавьте:
import scala.concurrent.ExecutionContext.Implicits.global
Другой способ сделать это - использовать Coeval
от monix. Этот метод работает не во всех ситуациях, и вы можете прочитать о нем здесь. Основная идея состоит в том, что иногда будущее на самом деле не занимает какое-то время и возвращает результат синхронного вызова функции или значения, поэтому это будущее может быть оценено в текущем потоке. Это также полезно для тестирования и имитации будущего. Также вам не нужно указывать ожидаемый тайм-аут, но все же приятно, что об этом не нужно беспокоиться.
Вы начинаете с превращения будущего в Task
и оберните эту задачу в Coeval
затем скрестите пальцы, ожидая увидеть, что у вас получится. Это очень простой пример, демонстрирующий, как это работает:
Вам нужен неявный Scheduler
чтобы иметь возможность его использовать:
import monix.execution.Scheduler.Implicits.global
Coeval(Task.fromFuture(Future (42)).runSyncStep).value() match {
case Right(v) => println(v)
case Left(task) => println("Task did not finish")
}
Вышеупомянутое завершает и печатает 42
к консоли.
Coeval(Task.fromFuture(Future {
scala.concurrent.blocking {
42
}
}).runSyncStep).value() match {
case Right(v) => println(v)
case Left(task) => println("Task did not finish")
}
Этот пример печатает Task did not finish
:
You can simply run the future to completion without giving any timeout interval by setting the timeout to infinite as below:
**import scala.concurrent.duration._
Await.result(run(executionContext), Duration.Inf)**
run function can be as below :
def run(implicit ec: ExecutionContext) = {
val list = Seq(
Future { println("start 1"); Thread.sleep(1000); println("stop 1")},
Future { println("start 2"); Thread.sleep(2000); println("stop 2")},
Future { println("start 3"); Thread.sleep(3000); println("stop 3")},
Future { println("start 4"); Thread.sleep(4000); println("stop 4")},
Future { println("start 5"); Thread.sleep(5000); println("stop 5")}
)
Future.sequence(list)
}
Я использую эту версию (основанную на примере Play выше), которая использует диспетчер системы Akka:
object TimeoutFuture {
def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
implicit val executionContext = system.dispatcher
val prom = Promise[A]
// timeout logic
system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}
// business logic
Future {
try {
prom success block
} catch {
case t: Throwable => prom tryFailure t
}
}
prom.future
}
}
Самый простой способ указать время ожидания в Future IMO - это встроенный механизм scala, использующий scala.concurrent.Await.ready
Это бросит TimeoutException
если будущее занимает больше времени, чем указанное время ожидания. Иначе оно вернет само Будущее. Вот простой надуманный пример
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
val f1: Future[Int] = Future {
Thread.sleep(1100)
5
}
val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds)
val f: Future[Int] = Future {
Thread.sleep(1100)
5
}
val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds)