Получение дубликатов в Таблице, когда задание ETL разрушается дважды. Задание извлечения задания ETL из корзины RDS в S3
Когда задание ETL выполняется, оно выполняется должным образом, но поскольку в таблице отсутствует метка времени, оно дублирует данные при запуске того же задания ETL. Как выполнить постановку и решить эту проблему, используя Upsert или, если есть другие, вы можете ответить. Как избавиться ли я от этой проблемы, решение, которое я нахожу, это либо включить в нее метку времени, либо выполнить постановку, или есть какой-то другой способ?
2 ответа
Вы можете использовать overwrite
во время записи данных в s3. Это заменит исходные данные
Чтобы предотвратить дублирование на s3, вам нужно загрузить данные из места назначения и отфильтровать существующие записи перед сохранением:
val deltaDf = newDataDf.alias("new")
.join(existingDf.alias("existing"), "id", "left_outer")
.where(col("existing.id").isNull)
.select("new.*")
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))
Однако этот метод не перезаписывает обновленные записи.
Другим вариантом является сохранение обновленных записей тоже с некоторыми updated_at
поле, которое может использоваться нижестоящими потребителями для получения последних значений.
Вы также можете рассмотреть возможность выгрузки набора данных в отдельную папку при каждом запуске задания (т. Е. Каждый день, когда у вас появляется полный дамп данных). data/dataset_date=<year-month-day>
)
import org.apache.spark.sql.functions._
val datedDf = sourceDf.withColumn("dataset_date", current_date())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path,
"partitionKeys" -> Array("dataset_date")
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(datedDf, glueContext))