Секвенирование `Будущее с таймаутом

Я использовал TimeoutScheduler представлен на Scala Futures - встроенный тайм-аут?,

Тем не менее, теперь моя программа не заканчивается, как раньше без TimeoutScheduler,

У меня два Futures: res1 а также res2, Оба с таймаутом 15 секунд. В конце я чередую оба Futures для правильного завершения работы HTTP-исполнителя в onComplete Перезвоните. Без использования withTimeout программа заканчивается сразу после http.shutdown, Но с использованием withTimeout это не так. Зачем? Там должно быть какое-то дальнейшее будущее...

import java.net.URI
import scala.util.{ Try, Failure, Success }
import dispatch._
import org.json4s._
import org.json4s.native.JsonMethods._
import com.typesafe.scalalogging.slf4j.Logging

object Main extends App with Logging {
  import scala.concurrent.ExecutionContext.Implicits.global

  val http   = Http
  val github = host("api.github.com").secure 

  import timeout._
  import scala.concurrent.duration._
  import scala.language.postfixOps
  import scala.collection.JavaConverters._

  val req: dispatch.Req =  github / "users" / "defunkt"
  val res1 = http(req > dispatch.as.Response(_.getHeaders().get("Server").asScala)) withTimeout (15 seconds) recover { case x => 
    logger.debug("Error1: " + x.toString) 
    Nil
  }
  val res2 = http(req > dispatch.as.Response(_.getHeaders().get("Vary").asScala)) withTimeout (15 seconds) recover { case x => 
    logger.debug("Error2: " + x.toString) 
    Nil
  }

  Future.sequence(res1 :: res2 :: Nil) onComplete { case _ => 
    http.shutdown()               // without using `withTimeout` the program terminated after `http.shutdow`
    TimeoutScheduler.timer.stop() // thanks to @cmbaxter
  }
}

object timeout {
  import java.util.concurrent.TimeUnit
  import scala.concurrent.Promise
  import scala.concurrent.duration.Duration
  import org.jboss.netty.util.Timeout
  import org.jboss.netty.util.TimerTask
  import org.jboss.netty.util.HashedWheelTimer
  import org.jboss.netty.handler.timeout.TimeoutException

   // cf. https://stackru.com/questions/16304471/scala-futures-built-in-timeout
  object TimeoutScheduler {
    val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
    def scheduleTimeout(promise: Promise[_], after: Duration) = {
      timer.newTimeout(new TimerTask{
        def run(timeout: Timeout){              
          promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
        }
      }, after.toNanos, TimeUnit.NANOSECONDS)
    }
  }
  implicit class FutureWithTimeout[T](f: Future[T]) {
    import scala.concurrent.ExecutionContext

    def withTimeout(after: Duration)(implicit ec: ExecutionContext) = {
      val prom = Promise[T]()
      val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
      val combinedFut = Future.firstCompletedOf(List(f, prom.future))
      f onComplete { case result => timeout.cancel() }
      combinedFut
    }
  } 
}

Любые предложения приветствуются, Best, / нм

1 ответ

Решение

Если вы использовали мой код точно так, как описано, то я думаю, что Netty HashedWheelTimer который сидит под TimeoutScheduler объект не прерывается Вы можете попробовать явно позвонить stop на это после звонка http.shutdown вот так:

TimeoutScheduler.timer.stop

Теперь, если вы хотели нетти HashedWheelTimer чтобы использовать поток демона, тогда вы можете использовать один из конструкторов (я вижу их в 3.6.6-Final), который принимает ThreadFactory а затем использовать обычай ThreadFactory который устанавливает флаг демона в true.

Другие вопросы по тегам