Scala Futures - встроенный тайм-аут?

Есть аспект будущего, который я не совсем понимаю из официального руководства. http://docs.scala-lang.org/overviews/core/futures.html

Есть ли у фьючерса в Scala встроенный механизм тайм-аута? Допустим, приведенный ниже пример представлял собой текстовый файл объемом 5 гигабайт... в конечном итоге подразумеваемая область действия "Implicits.global" приводит к тому, что onFailure запускается неблокирующим образом, или это можно определить? И без какого-либо тайм-аута по умолчанию, разве это не значит, что ни успех, ни неудача никогда не сработают?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

10 ответов

Решение

Вы получаете поведение тайм-аута только тогда, когда вы используете блокировку, чтобы получить результаты Future, Если вы хотите использовать неблокирующие обратные вызовы onComplete, onSuccess или же onFailure, тогда вам придется свернуть свой собственный тайм-аут обработки. Акка имеет встроенную обработку тайм-аута для запроса / ответа (?) обмен сообщениями между актерами, но не уверен, если вы хотите начать использовать Akka. FWIW, в Akka, для обработки тайм-аута, они составляют два Futures вместе через Future.firstCompletedOfтот, который представляет фактическую асинхронную задачу, и тот, который представляет тайм-аут. Если тайм-аут (через HashedWheelTimer) появляется сначала, вы получаете ошибку при асинхронном обратном вызове.

Очень упрощенный пример скручивания собственного может пойти примерно так. Во-первых, объект для планирования тайм-аутов:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}

Затем функция, которая берет Future и добавляет к нему поведение времени ожидания:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}

Обратите внимание, что HashedWheelTimer Я пользуюсь вот от Нетти.

Все эти ответы требуют дополнительных зависимостей. Я решил написать версию, используя java.util.Timer, который является эффективным способом запуска функции в будущем, в данном случае для запуска тайм-аута.

Сообщение в блоге с более подробной информацией здесь

Используя это с обещанием Scala, мы можем создать будущее с таймаутом следующим образом:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}

Я только что создал TimeoutFuture класс для сотрудника:

TimeoutFuture

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}

использование

val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}

future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

Заметки:

  • Предполагает играть! рамки (но это достаточно легко адаптировать)
  • Каждый кусок кода работает в одном и том же ExecutionContext что не может быть идеальным.

Play Framework содержит Promise.timeout, так что вы можете написать код, как показано ниже

private def get(): Future[Option[Boolean]] = {
  val timeoutFuture = Promise.timeout(None, Duration("1s"))
  val mayBeHaveData = Future{
    // do something
    Some(true)
  }

  // if timeout occurred then None will be result of method
  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}

Я весьма удивлен, что это не стандартно в Scala. Мои версии короткие и не имеют зависимостей

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit class FutureTimeoutLike[T](f: Future[T]) {
    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
      Thread.sleep(ms)
      throw new TimeoutException
    }))

    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
  }

}

Пример использования

import FutureTimeout._
Future { /* do smth */ } withTimeout

Если вы хотите, чтобы автором (держателем обещаний) был тот, кто контролирует логику тайм-аута, используйте akka.pattern.after следующим образом:

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))

Таким образом, если ваша логика завершения обещания никогда не выполняется, будущее вашего абонента все равно будет завершено в какой-то момент с ошибкой.

Вы можете указать время ожидания при ожидании на будущее:

За scala.concurrent.Future, result Метод позволяет указать время ожидания.

За scala.actors.Future, Futures.awaitAll позволяет указать время ожидания.

Я не думаю, что есть тайм-аут, встроенный в исполнение Future.

Никто не упоминается akka-streams, еще. Потоки имеют легкий completionTimeout метод, и применение этого к потоку с одним источником работает как будущее.

Но akka-streams также выполняет отмену, поэтому он может фактически прекратить работу источника, т.е. он сигнализирует об истечении времени ожидания источника.

Эта версия работает без использования тайм-аута

import scala.concurrent._
import scala.concurrent.duration.FiniteDuration

object TimeoutFuture {
    def apply[A](
        timeout: FiniteDuration
    )(block: => A)(implicit executor: ExecutionContext): Future[A] =
        try {
            Future { Await.result(Future { block }, timeout) }
        } catch {
            case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
        }
}

Monix Task имеет поддержку тайм-аута

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val source = Task("Hello!").delayExecution(10.seconds)

// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)

timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)

Вы можете дождаться завершения будущего, используя Await.

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

val meaningOfLife: Int = Await.result(Future(42), 1.nano)
println (meaningOfLife)

Приведенные выше отпечатки 42

Вам может понадобиться неявный ExecutionContext доступно, и в этом случае просто добавьте:

import scala.concurrent.ExecutionContext.Implicits.global

Другой способ сделать это - использовать Coevalот monix. Этот метод работает не во всех ситуациях, и вы можете прочитать о нем здесь. Основная идея состоит в том, что иногда будущее на самом деле не занимает какое-то время и возвращает результат синхронного вызова функции или значения, поэтому это будущее может быть оценено в текущем потоке. Это также полезно для тестирования и имитации будущего. Также вам не нужно указывать ожидаемый тайм-аут, но все же приятно, что об этом не нужно беспокоиться.

Вы начинаете с превращения будущего в Task и оберните эту задачу в Coevalзатем скрестите пальцы, ожидая увидеть, что у вас получится. Это очень простой пример, демонстрирующий, как это работает:

Вам нужен неявный Scheduler чтобы иметь возможность его использовать:

import monix.execution.Scheduler.Implicits.global


Coeval(Task.fromFuture(Future (42)).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

Вышеупомянутое завершает и печатает 42 к консоли.

Coeval(Task.fromFuture(Future {
   scala.concurrent.blocking {
      42
   }
}).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

Этот пример печатает Task did not finish:

You can simply run the future to completion without giving any timeout interval by setting the timeout to infinite as below:

**import scala.concurrent.duration._  
Await.result(run(executionContext), Duration.Inf)**

run function can be as below :

def run(implicit ec: ExecutionContext) = {  
      val list = Seq(  
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")},  
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},  
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},  
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},  
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}  
      )  
      Future.sequence(list)  
    }  

Я использую эту версию (основанную на примере Play выше), которая использует диспетчер системы Akka:

object TimeoutFuture {
  def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
    implicit val executionContext = system.dispatcher

    val prom = Promise[A]

    // timeout logic
    system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future {
      try {
        prom success block
      } catch {
        case t: Throwable => prom tryFailure t
      }
    }

    prom.future
  }
}

Самый простой способ указать время ожидания в Future IMO - это встроенный механизм scala, использующий scala.concurrent.Await.ready Это бросит TimeoutException если будущее занимает больше времени, чем указанное время ожидания. Иначе оно вернет само Будущее. Вот простой надуманный пример

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
val f1: Future[Int] = Future {
  Thread.sleep(1100)
  5
}

val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds)

val f: Future[Int] = Future {
  Thread.sleep(1100)
  5
}
val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds)
Другие вопросы по тегам