Проверьте файл при потоковой передаче файла csv с помощью scala
Я работаю с потоковой передачей и не хочу обрабатывать старые файлы, когда новые потоковые файлы появляются каждые 10 минут:
val val1= spark
.read //
.option("header", "true")
.option("schema", "true")
.option("sep", ",")
.csv(path_to_file).toDF().cache()
val1.registerTempTable("test")
после создания фрейма данных я делаю некоторые преобразования и обрабатываю контрольную точку, которая может помочь мне и как я использовал в моем случае
0 ответов
*****************решение*******************
val spark = SparkSession.builder.appName ("test").config ("spark.local", "local [*]").getOrCreate () spark.sparkContext.setCheckpointDir (path_checkpoint) и после того, как я вызываю функцию контрольной точки на датафрейм И я указал триггер для выполнения задания
.writeStream
.format("csv")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.option("checkpointLocation",CheckPoint)
.trigger(Trigger.ProcessingTime("180 seconds"))
.option("Path",Path )
.option("header", true)
.outputMode("Append")
.queryName("test")
.start()