UPSERT в паркете Pyspark
У меня есть паркетные файлы в s3 со следующими разделами: год / месяц / дата / some_id Используя Spark (PySpark), каждый день я хотел бы как бы UPSERT последние 14 дней - я хотел бы заменить существующие данные в s3 (один parquet для каждого раздела), но не удалять дни до 14 дней. Я попробовал два режима сохранения: добавление - не очень хорошо, потому что он просто добавляет еще один файл. перезапись - это удаление прошлых данных и данных для других разделов.
Есть ли какой-либо способ или лучший способ преодолеть это? я должен читать все данные из s3 при каждом запуске и записывать их снова? возможно, переименовать файлы так, чтобы добавление заменило текущий файл в s3?
Большое спасибо!
3 ответа
Я обычно делаю нечто подобное. В моем случае я выполняю ETL и добавляю данные за один день в файл паркета:
Ключ в том, чтобы работать с данными, которые вы хотите записать (в моем случае - с фактической датой), обязательно разбейте их по date
столбец и перезаписать все данные на текущую дату.
Это сохранит все старые данные. Например:
(
sdf
.write
.format("parquet")
.mode("overwrite")
.partitionBy("date")
.option("replaceWhere", "2020-01-27")
.save(uri)
)
Также вы можете взглянуть на https://delta.io/, который является расширением формата parquet, который предоставляет некоторые интересные функции, такие как транзакции ACID.
Всем спасибо за полезные решения. В итоге я использовал некоторую конфигурацию, которая обслуживала мой вариант использования - использование режима перезаписи при написании паркета вместе с этой конфигурацией:
Я добавил этот конфиг:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
с этой конфигурацией искра будет перезаписывать только те разделы, для которых у нее есть данные для записи. Все остальные (прошлые) разделы остаются нетронутыми - см. Здесь:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-dynamic-partition-inserts.html
Насколько мне известно, в S3 нет операции обновления. После того, как объект добавлен в s3, его нельзя изменить. (либо вам нужно заменить другой объект, либо добавить файл)
В любом случае, если вы хотите прочитать все данные, вы можете указать временную шкалу, которую хотите прочитать, сокращение разделов помогает читать только разделы на временной шкале.