Как погрузить строку как json в поток кинезиса flink с помощью Scala?
Как мы можем создать переменную pojo как json в потоке kinesis flink:
val inputStream: DataStream[Array[Byte]] = env.addSource {
loadConsumerOrFail(config, jobName)
}
inputStream.print()
val transformedStream: DataStream[String] = inputStream.map { jsonstr =>
val sJson = JsonMethods.parse((jsonstr.map(_.toChar)).mkString)
val payloadJsonValue = sJson \ "line"
implicit val formats = DefaultFormats
val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"", "")
val payloadBytes = base64Decoder.decode(payvalue)
val collectorPayload = new CollectorPayload
thriftDeserializer.deserialize(collectorPayload, payloadBytes)
badStream(collectorPayload.ipAddress,
collectorPayload.userAgent,
collectorPayload.timestamp,
collectorPayload.refererUri,
collectorPayload.hostname,
(sJson \ "failure_tstamp").extract[String],
collectorPayload.body,
collectorPayload.toString)
}
transformedStream.addSink(loadProducerOrFail(config, jobName))
здесь transformedStream будет погружен в другой кинезис, но как json, но как преобразовать в json