Можем ли мы сохранить смещение Spark-SQL-Kafka в таблице MySQL вместо HDFS или S3
У меня есть простая программа Spark-SQL-Kafka, которая читает из Kafka и пишет в HDFS.
Для проверки я использовал HDFS и S3 в прошлом, он отлично работает.
Есть ли способ, где я могу использовать MySQL для проверки чека?
.option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
Как мы можем настроить таблицу MySQL?
DataStreamReader kafkaDataStreamReader = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers"))
.option("group.id", ConfigLoader.getValue("groupId"))
.option("subscribe", ConfigLoader.getValue("topics"))
.option("failOnDataLoss", false);
Dataset<Row> rawDataSet = kafkaDataStreamReader.load();
rawDataSet.createOrReplaceTempView("rawEventView");
sqlCtx.sql("select * from rawEventView")
.writeStream()
.partitionBy(JavaConversions.asScalaBuffer(Arrays.asList(("date_year,date_month,date_day,date_hour,date_minute").split(","))))
.format("csv")
.option("header", "true")
.option("compression","gzip")
.option("delimiter", "~")
.option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
.option("path", ConfigLoader.getValue("recordsPath"))
.outputMode(OutputMode.Append())
.trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTimeInSeconds")), TimeUnit.SECONDS))
.start()
.awaitTermination();