Play2 Framework прокси потокового контента на клиенте сохраняет соединение открытым после завершения потоковой передачи

Приведенный ниже код выполняет потоковую передачу обратно клиенту, поскольку, как я понял, это более идиоматический способ, чем использование потоков ввода-вывода Java. Однако у него есть проблема: соединение остается открытым после завершения потока.

def getImage() = Action { request =>
  val imageUrl = "http://hereandthere.com/someimageurl.png"
  Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
    WS.url(imageUrl).withHeaders("Accept"->"image/png").get { response => content }
    return
  }).withHeaders("Content-Type"->"image/png")
}

это предназначено для потоковой передачи больших (>1 МБ) файлов из внутреннего API запросчику.

Вопрос в том, почему он держит соединение открытым? Есть ли что-то, что он ожидает от вышестоящего сервера? Я проверил вышестоящий сервер, используя curl, и соединение закрывается - оно просто не закрывается при прохождении через этот прокси.

3 ответа

Решение

Причина, по которой поток не завершается, заключается в том, что EOF не отправляется итерируемому, который возвращается из вызова WS.get(). Без этого явного EOF соединение остается открытым - так как оно находится в чанкованном режиме и потенциально является длительным кометоподобным соединением.

Вот фиксированный код:

Ok.stream({ content: Iteratee[Array[Byte], Unit] => 
  WS.url(imageUrl)
    .withHeaders("Accept"->"image/png")
    .get { response => content }
    .onRedeem { ii =>
       ii.feed(Input.EOF)
    }
}).withHeaders("Content-Type"->"image/png")

Обновление для игры 2.2.x:

def proxy = Action.async {
  val url = "http://localhost:9000"

  def enumerator(chunks: Iteratee[Array[Byte], Unit] => _) = {
    new Enumerator[Array[Byte]] {
      def apply[C](i: Iteratee[Array[Byte], C]): Future[Iteratee[Array[Byte], C]] = {
        val doneIteratee = Promise[Iteratee[Array[Byte], C]]()
        chunks(i.map {
          done =>
            doneIteratee.success(Done[Array[Byte], C](done)).asInstanceOf[Unit]
        })
        doneIteratee.future
      }
    }
  }

  val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]()
  val resultPromise = Promise[SimpleResult]()

  WS.url(url).get {
    responseHeaders =>

      resultPromise.success(new Status(responseHeaders.status).chunked(
        enumerator({
          content: Iteratee[Array[Byte], Unit] => iterateePromise.success(content)
        }
        )).withHeaders(
        "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
        "Connection" -> "Close"))

      Iteratee.flatten(iterateePromise.future)
  }.onComplete {
    case Success(ii) => ii.feed(Input.EOF)
    case Failure(t) => throw t
  }

  resultPromise.future
}

если у кого-то есть лучшее решение, оно меня очень интересует!

Вот модифицированная версия для игры 2.1.0. См. Https://groups.google.com/forum/#!msg/play-framework/HwoRR-nipCc/gUKs9NexCx4J.

Спасибо Анатолию Г за обмен.

def proxy = Action {

   val url = "..."

   Async {
     val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]
     val resultPromise = Promise[ChunkedResult[Array[Byte]]]

     WS.url(url).get { responseHeaders =>
       resultPromise.success {
         new Status(responseHeaders.status).stream({ content: Iteratee[Array[Byte], Unit] =>
           iterateePromise.success(content)
         }).withHeaders(
           "Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
           "Connection" -> "Close")
       }
       Iteratee.flatten(iterateePromise.future)
     }.onComplete {
       case Success(ii) => ii.feed(Input.EOF)
       case Failure(t) => resultPromise.failure(t)
     }

     resultPromise.future
   }

}
Другие вопросы по тегам