Конвертировать HttpEntity.Chunked в Array[String]

У меня следующая проблема. Я запрашиваю у сервера некоторые данные и возвращаю их как HttpEntity.Chunked. Строка ответа выглядит примерно так: до 10.000.000 строк:

[{"name":"param1","value":122343,"time":45435345},
{"name":"param2","value":243,"time":4325435},
......]

Теперь я хочу получить входящие данные в массив Array[String], где каждая строка представляет собой строку из ответа, потому что в дальнейшем она должна быть импортирована в фрейм данных apache spark. В настоящее время я делаю это нравится это:

//For the http request
trait StartHttpRequest {
  implicit val system: ActorSystem
  implicit val materializer: ActorMaterializer

  def httpRequest(data: String, path: String, targetPort: Int, host: String): Future[HttpResponse] = {
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
      Http().outgoingConnection(host, port = targetPort)
    }
    val responseFuture: Future[HttpResponse] =
      Source.single(RequestBuilding.Post(uri = path, entity = HttpEntity(ContentTypes.`application/json`, data)))
        .via(connectionFlow)
        .runWith(Sink.head)
    responseFuture
  }
}

//result of the request
val responseFuture: Future[HttpResponse] = httpRequest(.....)

//convert to string
responseFuture.flatMap { response =>
        response.status match {
          case StatusCodes.OK =>
            Unmarshal(response.entity).to[String]
    }
}

//and then something like this, but with even more stupid stuff
responseFuture.onSuccess { str:String =>
    masterActor! str.split("""\},\{""")
}

Мой вопрос, что было бы лучшим способом получить результат в массив? Как я могу напрямую аннулировать ответную сущность? Потому что.to[Array[String]] например не работает. И поскольку впереди столько строк, могу ли я сделать это с помощью потока, чтобы быть более эффективным?

1 ответ

Решение

Отвечая на ваши вопросы не по порядку:

Как я могу напрямую аннулировать ответную сущность?

Существует существующий вопрос и ответ, связанный с отменой сортировки массива case-классов.

что будет лучшим способом получить результат в массив?

Я бы воспользовался природой Chunked и использовал бы потоки. Это позволяет одновременно выполнять обработку строк и анализ json.

Сначала вам нужен контейнерный класс и парсер:

case class Data(name : String, value : Int, time : Long)

object MyJsonProtocol extends DefaultJsonProtocol {
  implicit val dataFormat = jsonFormat3(Data)
}

Затем вы должны выполнить некоторые манипуляции, чтобы объекты json выглядели правильно:

//Drops the '[' and the ']' characters
val dropArrayMarkers = 
  Flow[ByteString].map(_.filterNot(b => b == '['.toByte || b == ']'.toByte))

val preppendBrace = 
  Flow[String].map(s => if(!s.startsWith("{")) "{" + s else s)

val appendBrace = 
  Flow[String].map(s => if(!s.endsWith("}")) s + "}" else s)

val parseJson = 
  Flow[String].map(_.parseJson.convertTo[Data])

Наконец, объедините эти потоки, чтобы преобразовать источник ByteString в объекты источника данных:

def strSourceToDataSource(source : Source[ByteString,_]) : Source[Data, _] = 
  source.via(dropArrayMarkers)
        .via(Framing.delimiter(ByteString("},{"), 256, true))
        .map(_.utf8String)
        .via(prependBrace)
        .via(appendBrace)
        .via(parseJson)

Этот источник затем может быть истощен в Seq объектов данных:

val dataSeq : Future[Seq[Data]] = 
  responseFuture flatMap { response =>
    response.status match {
      case StatusCodes.OK =>
        strSourceToDataSource(response.entity.dataBytes).runWith(Sink.seq)
    }
  }
Другие вопросы по тегам