Spark2 Kafka Структурированная потоковая передача Java не знает функцию from_json
У меня вопрос по поводу структурированной потоковой передачи Spark на Kafka.
У меня есть схема типа:
StructType schema = new StructType()
.add("field1", StringType)
.add("field2", StringType)
.add("field3", StringType)
.add("field4", StringType)
.add("field5", StringType);
Я загружаю свой поток из темы Кафки как:
Dataset<Row> ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "brokerlist")
.option("zookeeper.connect", "zk_url")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load();
следующее преобразование в строку, тип строки:
Dataset<Row> df1 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
Теперь я хотел бы преобразовать поле значения (которое является JSON) в ранее преобразованную схему, которая должна упростить запросы SQL:
Dataset<Row> df2 = df1.select(from_json("value", schema=schema).as("data").select("single_column_field");
Похоже, что Spark 2.3.1 не знает from_json
функционировать?
Это мой импорт:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
Есть идеи, как это решить? Обратите внимание, что я ищу не решение Scala, а решение, основанное исключительно на Java!
1 ответ
Этот код работает для меня. Надеюсь, это поможет
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.34.216:9092")
.option("subscribe", "topicName")
.load()
//df.show();
import spark.implicits._
val comingXDR = df.select("value").as[String].withColumn("_tmp", split($"value", "\\,")).withColumn("MyNewColumnName1", $"_tmp".getItem(0)).withColumn("MyNewColumnName2", $"_tmp".getItem(1)).withColumn("MyNewColumnName3", $"_tmp".getItem(2)).withColumn("MyNewColumnName4", $"_tmp".getItem(3)).drop("value").drop("_tmp")