Как погрузить строку как json в поток кинезиса flink с помощью Scala?

Как мы можем создать переменную pojo как json в потоке kinesis flink:

      val inputStream: DataStream[Array[Byte]] = env.addSource {
  loadConsumerOrFail(config, jobName)
}
inputStream.print()
val transformedStream: DataStream[String] = inputStream.map { jsonstr =>
  val sJson = JsonMethods.parse((jsonstr.map(_.toChar)).mkString)
  val payloadJsonValue = sJson \ "line"

  implicit val formats = DefaultFormats
  val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"", "")
  val payloadBytes = base64Decoder.decode(payvalue)

  val collectorPayload  = new CollectorPayload
  thriftDeserializer.deserialize(collectorPayload, payloadBytes)

  badStream(collectorPayload.ipAddress,
    collectorPayload.userAgent,
    collectorPayload.timestamp,
    collectorPayload.refererUri,
    collectorPayload.hostname,
    (sJson \ "failure_tstamp").extract[String],
    collectorPayload.body,
    collectorPayload.toString)

}
transformedStream.addSink(loadProducerOrFail(config, jobName))

здесь transformedStream будет погружен в другой кинезис, но как json, но как преобразовать в json

0 ответов

Другие вопросы по тегам