Как преобразовать DataSet<Row> в DataSet сообщений JSON для записи в Kafka?

Я использую Spark 2.1.1.

У меня есть следующее DataSet<Row> DS1;

 name   | ratio | count  // column names
"hello" |  1.56 | 34 

(ds1.isStreaming дает true)

и я пытаюсь создать DataSet<String> ds2. другими словами, когда я пишу в кафку, я хочу написать что-то вроде этого

{"name": "hello", "ratio": 1.56, "count": 34}

Я пробовал что-то подобное df2.toJSON().writeStream().foreach(new KafkaSink()).start() но тогда это дает следующую ошибку

Queries with streaming sources must be executed with writeStream.start()

Есть to_json а также json_tuple Однако я не уверен, как использовать их здесь?


Я попробовал следующее, используя json_tuple() функция

 Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());

и я получаю следующую ошибку:

не могу решить "resultзаданные входные столбцы: [имя, соотношение, количество];;

1 ответ

Решение

TL; Dr Использование struct функция с последующим to_json (как toJSON был сломан для потоковых наборов данных из-за SPARK-17029, который был исправлен только 20 дней назад).


Процитируем скаладок структуры:

struct (colName: String, colNames: String *): Column Создает новый столбец структуры, который состоит из нескольких входных столбцов.

Учитывая, что вы используете Java API, у вас есть 4 различных варианта функции структуры:

public static Column struct (Column... cols) Создает новый столбец структуры.

С функцией to_json ваш случай покрывается:

public static Column to_json (Column e) Преобразует столбец, содержащий StructType, в строку JSON с указанной схемой.

Ниже приведен код Scala (его перевод на Java - ваше домашнее упражнение):

val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record                                  |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+

Я также попробовал ваше решение (со Spark 2.3.0-SNAPSHOT, созданным сегодня), и кажется, что оно работает отлично.

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string")
fromKafka.
  toJSON. // <-- JSON conversion
  writeStream.
  format("console"). // using console sink
  start

format("kafka") был добавлен в SPARK-19719 и недоступен в 2.1.0.

Другие вопросы по тегам