Hudi Создание небольших файлов размером 0kb в базовом пути при использовании Spark Structured Streaming

val query = newDf.coalesce(1).writeStream.
      outputMode("append").format("hudi")
    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
      .option("hoodie.datasource.write.recordkey.field", "value,score")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"region,year")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "year")
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      .option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.ComplexKeyGenerator")
      .option("hoodie.index.type","GLOBAL_BLOOM")
      .option("hoodie.bloom.index.update.partition.path","true")
      .option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS")
      .option("hoodie.keep.max.commits","2")
      .option("hoodie.keep.min.commits","1")
      .option("hoodie.cleaner.commits.retained","0")
      .option("checkpointLocation","/demo/app2")
      .option("path","/tmp/kafka")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .option("hoodie.clean.automatic","true")
      .start("/tmp/kafka")


    query.awaitTermination()

Хотя паркетные файлы, созданные в папке kafka, имеют соответствующий размер, но для каждого триггера микропакета в папку tmp записывается один небольшой пустой файл 0kb

0 ответов

Другие вопросы по тегам