Блоки данных - ошибка записи из DataFrame в местоположение Delta

Я хотел изменить имя столбца таблицы дельта Databricks.

Итак, я сделал следующее:

// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")

// Created a new DF with a renamed column
val new_data_DF = old_data_DF
      .withColumnRenamed("column_a", "metric1")
      .select("*")

// Dropped and recereated the Delta files location
dbutils.fs.rm("dbfs:/mnt/main/sales", true)
dbutils.fs.mkdirs("dbfs:/mnt/main/sales")

// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")

Здесь я получаю сообщение об ошибке на последнем шаге при записи в Delta:

java.io.FileNotFoundException: dbfs:/mnt/main/sales/sale_date_partition=2019-04-29/part-00000-769.c000.snappy.parquet
A file referenced in the transaction log cannot be found. This occurs when data has been manually deleted from the file system rather than using the table `DELETE` statement

Очевидно, что данные были удалены, и, скорее всего, я что-то упустил в приведенной выше логике. Теперь единственное место, которое содержит данные, это new_data_DF, Запись в таком месте, как dbfs:/mnt/main/sales_tmp также не удается

Что я должен сделать, чтобы записать данные из new_data_DF в место Delta?

1 ответ

В общем, это хорошая идея, чтобы избежать использования rm на дельта столах. Журнал транзакций Delta может предотвратить возможные проблемы согласованности в большинстве случаев, однако, когда вы удаляете и воссоздаете таблицу за очень короткое время, различные версии журнала транзакций могут вспыхнуть и исчезнуть.

Вместо этого я бы рекомендовал использовать транзакционные примитивы, предоставляемые Delta. Например, чтобы перезаписать данные в таблице, вы можете:

df.write.format("delta").mode("overwrite").save("/delta/events")

Если у вас есть таблица, которая уже была повреждена, вы можете исправить это с помощью FSCK.

Вы можете сделать это следующим образом.

// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")

// Created a new DF with a renamed column
val new_data_DF = old_data_DF
  .withColumnRenamed("column_a", "metric1")
  .select("*")

// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.mode("overwrite") // this would overwrite the whole data files
.option("overwriteSchema", "true")  //this is the key line.
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")

Параметр OverWriteSchema создаст новые физические файлы с последней схемой, которую мы обновили во время преобразования.

Другие вопросы по тегам