Как получить ошибку от akka-stream, распространяющуюся на akka-http, чтобы оба были зарегистрированы и правильно уведомлять клиента?

Щас пользуюсь akka-stream а также akka-HTTP построить API потоковой передачи файлов. Таким образом, я внедряю потоковый источник в объект, чтобы данные передавались напрямую HTTP-клиенту следующим образом:

complete(HttpEntity(ContentTypes.`application/octet-stream`, source))

Однако, если по какой-то причине поток завершается неудачно, соединение закрывается с помощью akka-http без дальнейшего объяснения или регистрации.

Мне нужно 2 вещи:

  • Как я могу получить журналы исключений?
  • Как я могу уведомить своего клиента сообщением перед закрытием соединения?

Спасибо

1 ответ

Решение

Как уже упоминалось в комментарии, протокол HTTP не позволяет сигнализировать об ошибке клиентской стороне.

Что касается ведения журнала: Для меня это сводится к отсутствию правильной директивы журнала доступа в akka http.

В моем текущем проекте у нас есть декоратор, который регистрирует обработчик onComplete для сущности http, прежде чем передать его akka http для рендеринга.

  private def onResponseStreamEnd(response: HttpResponse)(action: StatusCode => Unit): HttpResponse =
    if (!response.status.allowsEntity() || response.entity.isKnownEmpty()) {
      action(response.status)
      response
    } else {
      val dataBytes =
        onStreamEnd(response.entity) { result =>
          val overallStatusCode =
            result match {
              case Success(_) =>
                response.status

              case Failure(e) =>
                logger.error(e, s"error streaming response [${e.getMessage}]")
                StatusCodes.InternalServerError
            }

          action(overallStatusCode)
        }

      response.withEntity(response.entity.contentLengthOption match {
        case Some(length) => HttpEntity(response.entity.contentType, length, dataBytes)
        case None         => HttpEntity(response.entity.contentType, dataBytes)
      })
    }

  private def onStreamEnd(entity: HttpEntity)(onComplete: Try[Done] ⇒ Unit): Source[ByteString, _] =
    entity.dataBytes.alsoTo { Sink.onComplete(onComplete) }

Использование:

complete(onResponseStreamEnd(HttpResponse(StatusCodes.OK, HttpEntity(ContentTypes.`application/octet-stream`, source))){ statusCode => .... })

Аналогичный подход, но с использованием пользовательского этапа графа вы можете найти здесь

Другие вопросы по тегам