Как преобразовать DataSet<Row> в DataSet сообщений JSON для записи в Kafka?
Я использую Spark 2.1.1.
У меня есть следующее DataSet<Row>
DS1;
name | ratio | count // column names
"hello" | 1.56 | 34
(ds1.isStreaming
дает true
)
и я пытаюсь создать DataSet<String>
ds2. другими словами, когда я пишу в кафку, я хочу написать что-то вроде этого
{"name": "hello", "ratio": 1.56, "count": 34}
Я пробовал что-то подобное df2.toJSON().writeStream().foreach(new KafkaSink()).start()
но тогда это дает следующую ошибку
Queries with streaming sources must be executed with writeStream.start()
Есть to_json
а также json_tuple
Однако я не уверен, как использовать их здесь?
Я попробовал следующее, используя json_tuple()
функция
Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());
и я получаю следующую ошибку:
не могу решить "
result
заданные входные столбцы: [имя, соотношение, количество];;
1 ответ
TL; Dr Использование struct
функция с последующим to_json
(как toJSON
был сломан для потоковых наборов данных из-за SPARK-17029, который был исправлен только 20 дней назад).
Процитируем скаладок структуры:
struct (colName: String, colNames: String *): Column Создает новый столбец структуры, который состоит из нескольких входных столбцов.
Учитывая, что вы используете Java API, у вас есть 4 различных варианта функции структуры:
public static Column struct (Column... cols) Создает новый столбец структуры.
С функцией to_json ваш случай покрывается:
public static Column to_json (Column e) Преобразует столбец, содержащий StructType, в строку JSON с указанной схемой.
Ниже приведен код Scala (его перевод на Java - ваше домашнее упражнение):
val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+
Я также попробовал ваше решение (со Spark 2.3.0-SNAPSHOT, созданным сегодня), и кажется, что оно работает отлично.
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string")
fromKafka.
toJSON. // <-- JSON conversion
writeStream.
format("console"). // using console sink
start
format("kafka")
был добавлен в SPARK-19719 и недоступен в 2.1.0.