Проблема с операцией обновления и удаления Apache Hudi в файле Parquet S3

Здесь я пытаюсь смоделировать обновления и удаления в наборе данных Hudi и хочу, чтобы состояние отражалось в таблице Athena. Мы используем сервисы EMR, S3 и Athena AWS.

  1. Попытка обновления записи с помощью объекта вывода
      withdrawalID_mutate = 10382495
updateDF = final_df.filter(col("withdrawalID") == withdrawalID_mutate) \ 
    .withColumn("accountHolderName", lit("Hudi_Updated"))  
    
updateDF.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save(tablePath) 
    
hudiDF = spark.read \
    .format("hudi") \
    .load(tablePath).filter(col("withdrawalID") == withdrawalID_mutate).show() 

Показывает обновленную запись, но фактически добавляется в таблицу Athena. Наверное, что-то связано с Каталогом клея?

  1. Попытка удалить запись
      deleteDF = updateDF #deleting the updated record above 
    
deleteDF.write.format("hudi") \ 
    .option('hoodie.datasource.write.operation', 'upsert') \
    .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
    .options(**hudi_options) \
    .mode("append") \
    .save(tablePath) 

по-прежнему отражает удаленную запись в таблице Афины

Также пробовал использовать mode("overwrite") но, как и ожидалось, он удаляет старые разделы и сохраняет только самые последние.

Кто-нибудь сталкивался с такой же проблемой и может ли направить в правильном направлении?

0 ответов

Другие вопросы по тегам