Как получить ошибку от akka-stream, распространяющуюся на akka-http, чтобы оба были зарегистрированы и правильно уведомлять клиента?
Щас пользуюсь akka-stream
а также akka-HTTP
построить API потоковой передачи файлов. Таким образом, я внедряю потоковый источник в объект, чтобы данные передавались напрямую HTTP-клиенту следующим образом:
complete(HttpEntity(ContentTypes.`application/octet-stream`, source))
Однако, если по какой-то причине поток завершается неудачно, соединение закрывается с помощью akka-http без дальнейшего объяснения или регистрации.
Мне нужно 2 вещи:
- Как я могу получить журналы исключений?
- Как я могу уведомить своего клиента сообщением перед закрытием соединения?
Спасибо
1 ответ
Как уже упоминалось в комментарии, протокол HTTP не позволяет сигнализировать об ошибке клиентской стороне.
Что касается ведения журнала: Для меня это сводится к отсутствию правильной директивы журнала доступа в akka http.
В моем текущем проекте у нас есть декоратор, который регистрирует обработчик onComplete для сущности http, прежде чем передать его akka http для рендеринга.
private def onResponseStreamEnd(response: HttpResponse)(action: StatusCode => Unit): HttpResponse =
if (!response.status.allowsEntity() || response.entity.isKnownEmpty()) {
action(response.status)
response
} else {
val dataBytes =
onStreamEnd(response.entity) { result =>
val overallStatusCode =
result match {
case Success(_) =>
response.status
case Failure(e) =>
logger.error(e, s"error streaming response [${e.getMessage}]")
StatusCodes.InternalServerError
}
action(overallStatusCode)
}
response.withEntity(response.entity.contentLengthOption match {
case Some(length) => HttpEntity(response.entity.contentType, length, dataBytes)
case None => HttpEntity(response.entity.contentType, dataBytes)
})
}
private def onStreamEnd(entity: HttpEntity)(onComplete: Try[Done] ⇒ Unit): Source[ByteString, _] =
entity.dataBytes.alsoTo { Sink.onComplete(onComplete) }
Использование:
complete(onResponseStreamEnd(HttpResponse(StatusCodes.OK, HttpEntity(ContentTypes.`application/octet-stream`, source))){ statusCode => .... })
Аналогичный подход, но с использованием пользовательского этапа графа вы можете найти здесь