Откат Дельта-Лейк
Нужен элегантный способ отката Delta Lake до предыдущей версии.
Мой текущий подход указан ниже:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, testFolder)
spark.read.format("delta")
.option("versionAsOf", 0)
.load(testFolder)
.write
.mode("overwrite")
.format("delta")
.save(testFolder)
Это уродливо, хотя, поскольку весь набор данных должен быть переписан. Кажется, что некоторого мета-обновления будет достаточно, и ввод-вывод данных не требуется. Кто-нибудь знает лучший подход для этого?
2 ответа
Как Дельта озера 0.7.0, вы можете откатиться к более ранней версии вашего стола Delta Lake, используя RESTORE команды. Это гораздо более простой способ использовать путешествия во времени для отката ваших таблиц.
Скала:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.restoreToVersion(0)
Python:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.restoreToVersion(0)
SQL:
RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0
Вы также можете использовать
restoreToTimestamp
command, если вы предпочитаете делать то же самое. Подробнее читайте в документации.
Вот жестокое решение. Это не идеально, но, учитывая, что перезапись большого набора данных разделами может быть дорогой, это простое решение может оказаться полезным.
Если вы не очень чувствительны к обновлениям после желаемого времени отката, просто удалите все файлы версий в _delta_log, которые позже времени отката. Файлы, на которые нет ссылок, могут быть освобождены позже с использованием вакуума.
Еще одно решение, которое сохраняет всю историю - это 1) deltaTable.delete
2) Скопируйте все журналы до отката последовательно (с увеличением номера версии) в конец файла журнала удаления. Это имитирует создание дельты озера до даты отката. Но это точно не красиво.
Если ваша цель - исправить неверные данные и вы не очень чувствительны к обновлениям, вы можете заменить интервал времени.
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/delta/events")
Я столкнулся с подобной проблемой с Delta, когда я вызывал несколько операций dml в одной транзакции. например, у меня было требование вызвать слияние, а затем удалить в одной транзакции. Итак, в этом случае либо они оба должны быть успешными вместе, либо откатить состояние, если какой-либо из них не работает.
Чтобы решить эту проблему, я сделал резервную копию каталога _delta_log (назовем его стабильным состоянием) перед началом транзакции. Если обе операции DML в транзакции выполнены успешно, то отмените предыдущее стабильное состояние и используйте новое состояние, зафиксированное в _delta_log, в случае сбоя какой-либо операции dml просто замените каталог _delta_log на стабильное состояние, которое вы сделали резервную копию ранее начало транзакции. После замены на стабильное состояние просто запустите вакуум, чтобы удалить устаревшие файлы, которые могли быть созданы во время транзакции.
Вы должны использовать функцию путешествия во времени: https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
Вы читаете данные как на отметке времени:
val inputPath = "/path/to/my/table@20190101000000000"
Затем перезаписать существующие данные версией "отката".
Что касается того, что это уродливо, я не уверен, что смогу помочь. Вы можете ограничить данные, используя разбиение. Или вы можете определить, какие записи изменились, и перезаписать их.