Можем ли мы сохранить смещение Spark-SQL-Kafka в таблице MySQL вместо HDFS или S3

У меня есть простая программа Spark-SQL-Kafka, которая читает из Kafka и пишет в HDFS.

Для проверки я использовал HDFS и S3 в прошлом, он отлично работает.

Есть ли способ, где я могу использовать MySQL для проверки чека?

.option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))

Как мы можем настроить таблицу MySQL?

DataStreamReader kafkaDataStreamReader = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers"))
        .option("group.id", ConfigLoader.getValue("groupId"))
        .option("subscribe", ConfigLoader.getValue("topics"))
        .option("failOnDataLoss", false);
Dataset<Row> rawDataSet = kafkaDataStreamReader.load();                         
rawDataSet.createOrReplaceTempView("rawEventView");

sqlCtx.sql("select * from rawEventView")
        .writeStream()
        .partitionBy(JavaConversions.asScalaBuffer(Arrays.asList(("date_year,date_month,date_day,date_hour,date_minute").split(","))))
        .format("csv")
        .option("header", "true")
        .option("compression","gzip")
        .option("delimiter", "~")
        .option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
        .option("path", ConfigLoader.getValue("recordsPath"))
        .outputMode(OutputMode.Append())
        .trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTimeInSeconds")), TimeUnit.SECONDS))
        .start()
        .awaitTermination();

0 ответов

Другие вопросы по тегам