Play2 Framework прокси потокового контента на клиенте сохраняет соединение открытым после завершения потоковой передачи
Приведенный ниже код выполняет потоковую передачу обратно клиенту, поскольку, как я понял, это более идиоматический способ, чем использование потоков ввода-вывода Java. Однако у него есть проблема: соединение остается открытым после завершения потока.
def getImage() = Action { request =>
val imageUrl = "http://hereandthere.com/someimageurl.png"
Ok.stream({ content: Iteratee[Array[Byte], Unit] =>
WS.url(imageUrl).withHeaders("Accept"->"image/png").get { response => content }
return
}).withHeaders("Content-Type"->"image/png")
}
это предназначено для потоковой передачи больших (>1 МБ) файлов из внутреннего API запросчику.
Вопрос в том, почему он держит соединение открытым? Есть ли что-то, что он ожидает от вышестоящего сервера? Я проверил вышестоящий сервер, используя curl, и соединение закрывается - оно просто не закрывается при прохождении через этот прокси.
3 ответа
Причина, по которой поток не завершается, заключается в том, что EOF не отправляется итерируемому, который возвращается из вызова WS.get(). Без этого явного EOF соединение остается открытым - так как оно находится в чанкованном режиме и потенциально является длительным кометоподобным соединением.
Вот фиксированный код:
Ok.stream({ content: Iteratee[Array[Byte], Unit] =>
WS.url(imageUrl)
.withHeaders("Accept"->"image/png")
.get { response => content }
.onRedeem { ii =>
ii.feed(Input.EOF)
}
}).withHeaders("Content-Type"->"image/png")
Обновление для игры 2.2.x:
def proxy = Action.async {
val url = "http://localhost:9000"
def enumerator(chunks: Iteratee[Array[Byte], Unit] => _) = {
new Enumerator[Array[Byte]] {
def apply[C](i: Iteratee[Array[Byte], C]): Future[Iteratee[Array[Byte], C]] = {
val doneIteratee = Promise[Iteratee[Array[Byte], C]]()
chunks(i.map {
done =>
doneIteratee.success(Done[Array[Byte], C](done)).asInstanceOf[Unit]
})
doneIteratee.future
}
}
}
val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]()
val resultPromise = Promise[SimpleResult]()
WS.url(url).get {
responseHeaders =>
resultPromise.success(new Status(responseHeaders.status).chunked(
enumerator({
content: Iteratee[Array[Byte], Unit] => iterateePromise.success(content)
}
)).withHeaders(
"Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
"Connection" -> "Close"))
Iteratee.flatten(iterateePromise.future)
}.onComplete {
case Success(ii) => ii.feed(Input.EOF)
case Failure(t) => throw t
}
resultPromise.future
}
если у кого-то есть лучшее решение, оно меня очень интересует!
Вот модифицированная версия для игры 2.1.0. См. Https://groups.google.com/forum/#!msg/play-framework/HwoRR-nipCc/gUKs9NexCx4J.
Спасибо Анатолию Г за обмен.
def proxy = Action {
val url = "..."
Async {
val iterateePromise = Promise[Iteratee[Array[Byte], Unit]]
val resultPromise = Promise[ChunkedResult[Array[Byte]]]
WS.url(url).get { responseHeaders =>
resultPromise.success {
new Status(responseHeaders.status).stream({ content: Iteratee[Array[Byte], Unit] =>
iterateePromise.success(content)
}).withHeaders(
"Content-Type" -> responseHeaders.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head,
"Connection" -> "Close")
}
Iteratee.flatten(iterateePromise.future)
}.onComplete {
case Success(ii) => ii.feed(Input.EOF)
case Failure(t) => resultPromise.failure(t)
}
resultPromise.future
}
}