Spark-kafka - каталог контрольных точек, дублирующий данные
Ниже сценарий, который я протестировал
- Я произвел 200 записей, и он создал 20 файлов по 10 записей в каждом.
- (После ожидания в течение 2 минут) я снова произвел 200 записей и немедленно убил приложение (используя yarn -kill). На этот раз он создал только 7 файлов.
- После перезапуска приложения оно снова создало 20 файлов, созданных на шаге 2.
В приведенном выше случае данные в 7 файлах дублируются. Как я могу контролировать это поведение, используя проверку наведения?
Ниже мой код:
try{
val kafkaOutput = result.writeStream
.outputMode("append")
.format("orc")
.option("path", "/warehouse/test_duplicate/download/data1")
.option("checkpointLocation", checkpoint_loc)
.option("maxRecordsPerFile", 10)
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
.awaitTermination()
result.checkpoint()
}catch {
case e: Exception => e.printStackTrace
}
finally
{
println("**********Finally Called***********")
result.checkpoint()
}