Как десериализовать записи из Kafka, используя структурированный поток в Java?

Я использую Spark 2.1.

Я пытаюсь читать записи из Kafka, используя Spark Structured Streaming, десериализовать их и впоследствии применять агрегации.

У меня есть следующий код:

SparkSession spark = SparkSession
        .builder()
        .appName("Statistics")
        .getOrCreate();

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUri)
        .option("subscribe", "Statistics")
        .option("startingOffsets", "earliest")
        .load();

df.selectExpr("CAST(value AS STRING)")

Я хочу десериализовать value поле в мой объект вместо приведения в String,

У меня есть специальный десериализатор для этого.

public StatisticsRecord deserialize(String s, byte[] bytes)

Как я могу сделать это на Java?


Единственная соответствующая ссылка, которую я нашел, - это https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html, но это для Scala.

2 ответа

Определите схему для ваших сообщений JSON.

StructType schema = DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("Id", DataTypes.IntegerType, false),
                DataTypes.createStructField("Name", DataTypes.StringType, false),
                DataTypes.createStructField("DOB", DataTypes.DateType, false) });

Теперь прочитайте сообщения, как показано ниже. MessageData - это JavaBean для вашего сообщения JSON.

Dataset<MessageData> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING) as message")
            .select(functions.from_json(functions.col("message"),schema).as("json"))
            .select("json.*")
            .as(Encoders.bean(MessageData.class));  

Если у вас есть собственный десериализатор в Java для ваших данных, используйте его на байтах, которые вы получаете от Kafka после load,

df.select("value")

Эта линия дает вам Dataset<Row> только с одним столбцом value,


Я использую исключительно Spark API для Scala, поэтому я бы сделал следующее в Scala, чтобы разобраться с делом "десериализация":

import org.apache.spark.sql.Encoders
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord]
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) }
df.select(myDeserializerUDF($"value") as "value_des")

Это должно дать вам то, что вы хотите... в Скала. Преобразование его в Java - ваше домашнее упражнение:)

Имейте в виду, что ваш пользовательский объект должен иметь доступный кодер, иначе Spark SQL откажется помещать свои объекты в набор данных.

Другие вопросы по тегам