Таблицы Delta Live с использованием SCD типа 1
Я пытаюсь загрузить данные с помощью DLT и SCD 1 и сталкиваюсь с сообщением об ошибке «Обнаружено обновление данных в исходной таблице версии x. В настоящее время это не поддерживается. Если вы хотите игнорировать обновления, установите параметр «игнорировать изменения» в «истина».
У меня есть один именованный устав DLT и еще один именованный филиал. Таблица ответвлений имеет отношение к чартеру, поэтому я присоединяюсь к ней в конвейере, чтобы найти чартер_ключ. Первоначальная загрузка данных проходит нормально, но когда я запускаю добавочную часть на следующий день, я получаю сообщение об ошибке. Эти данные довольно статичны, поэтому данные фактически не изменились между начальной и дополнительной загрузкой. Однако в созданной таблице DLT charter __apply_changes_storage_charter __UpsertVersion имеет самое последнее имя файла и обновленную __Timestamp. Это где он говорит, что обнаружил изменение?
Мой код из конвейера DLT приведен ниже. Таблицы _bronze просто берут необработанный файл паркета и помещают его во временную потоковую таблицу в реальном времени, поэтому я не включаю этот код.
CREATE OR REFRESH STREAMING LIVE TABLE charter
(charter_key bigint GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1)
,charter_number int
,charter_name string
,charter_address1 string
,charter_address2 string
,charter_zip_code string)
APPLY CHANGES INTO LIVE.charter
FROM STREAM(LIVE.charter_bronze)
KEYS (charter_number)
SEQUENCE BY file_name
COLUMNS * EXCEPT (file_name)
CREATE OR REFRESH TEMPORARY STREAMING LIVE TABLE branch_stage AS
SELECT c.charter_key
,b.branch_number
,b.branch_name
,b.branch_address1
,b.branch_address2
,b.branch_zip_code
,b.file_name
FROM STREAM(LIVE.charter) c
INNER JOIN STREAM(LIVE.branch_bronze) b
ON c.charter_number = b.charter_number
CREATE OR REFRESH STREAMING LIVE TABLE branch
(branch_key bigint GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1)
,charter_key bigint
,branch_number int
,branch_name string
,branch_address1 string
,branch_address2 string
,branch_zip_code string)
APPLY CHANGES INTO LIVE.branch
FROM STREAM(LIVE.branch_stage)
KEYS (charter_key, branch_number)
SEQUENCE BY file_name
COLUMNS * EXCEPT (file_name)
Я читал, что он обрабатывает файлы только как добавление, но как именно работает SCD? В конвейере у меня оба установлены на true:
конвейеры.applyChangesPreviewEnabled
spark.databricks.delta.schema.autoMerge.enabled
Любая помощь приветствуется.