Spark-kafka - каталог контрольных точек, дублирующий данные

Ниже сценарий, который я протестировал

  1. Я произвел 200 записей, и он создал 20 файлов по 10 записей в каждом.
  2. (После ожидания в течение 2 минут) я снова произвел 200 записей и немедленно убил приложение (используя yarn -kill). На этот раз он создал только 7 файлов.
  3. После перезапуска приложения оно снова создало 20 файлов, созданных на шаге 2.

В приведенном выше случае данные в 7 файлах дублируются. Как я могу контролировать это поведение, используя проверку наведения?

Ниже мой код:

 try{
  val kafkaOutput = result.writeStream
  .outputMode("append")
  .format("orc")
  .option("path", "/warehouse/test_duplicate/download/data1")
  .option("checkpointLocation", checkpoint_loc)
  .option("maxRecordsPerFile", 10)
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()
  .awaitTermination() 

   result.checkpoint()

}catch {
        case e: Exception => e.printStackTrace

    }
finally
{
  println("**********Finally Called***********")
  result.checkpoint()
}

0 ответов

Другие вопросы по тегам