Конвейер Databricks DLT с сообщением об ошибке for..loop «AnalysisException: невозможно переопределить набор данных»
У меня есть следующий код, который отлично работает для одной таблицы. Но когда я пытаюсь использовать for..loop() для обработки всех таблиц в моей базе данных, я получаю сообщение об ошибке,"AnalysisException: Cannot redefine dataset 'source_ds',Map(),Map(),List(),List(),Map())"
.
Мне нужно передать имя таблицы в source_ds, чтобы обработать CDC на основе key & sequence_columns. Оцените любую помощь/предложения, пожалуйста.
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
raw_db_name = "raw_db"
def generate_silver_tables(target_table, source_table, keys_col_list):
@dlt.table
def source_ds():
return spark.table(f"{raw_db_name}.{source_table}")
### Create the target table definition
dlt.create_target_table(name=target_table,
comment= f"Clean, merged {target_table}",
#partition_cols=["topic"],
table_properties={
"quality": "silver",
"pipelines.autoOptimize.managed": "true"
}
)
## Do the merge
dlt.apply_changes(
target = target_table,
source = "source_ds",
keys = keys_col_list,
apply_as_deletes = expr("operation = 'DELETE'"),
sequence_by = col("ts_ms"),
ignore_null_updates = False,
except_column_list = ["operation", "timestamp_ms"],
stored_as_scd_type = "1"
)
return
# THIS WORKS FINE
#---------------
# raw_dbname = "raw_db"
# raw_tbl_name = 'raw_table'
# processed_tbl_name = raw_tbl_name.replace("raw", "processed")
# generate_silver_tables(processed_tbl_name, raw_tbl_name)
table_list = spark.sql(f"show tables in landing_db ").collect()
for row in table_list:
landing_tbl_name = row.tableName
s2 = spark.sql(f"select key from {landing_db_name}.{landing_tbl_name} limit 1")
keys_col_list = list(json.loads(s2.collect()[0][0]).keys())
raw_tbl_name = landing_tbl_name.replace("landing", "raw")
processed_tbl_name = landing_tbl_name.replace("landing", "processed")
generate_silver_tables(processed_tbl_name, raw_tbl_name, keys_col_list)
# time.sleep(10)
1 ответ
Вам нужно дать уникальные имена каждой таблице, предоставивname
отнести кdlt.table
аннотацию для исходной таблицы, а затем использовать то же имя вapply_changes
. В противном случае это будет взято из имени функции и завершится ошибкой, потому что вы уже определили эту функцию. Так:
def generate_silver_tables(target_table, source_table, keys_col_list):
@dlt.table(
name=source_table
)
def source_ds():
return spark.table(f"{raw_db_name}.{source_table}")
### Create the target table definition
dlt.create_target_table(name=target_table,
comment= f"Clean, merged {target_table}",
#partition_cols=["topic"],
table_properties={
"quality": "silver",
"pipelines.autoOptimize.managed": "true"
}
)
## Do the merge
dlt.apply_changes(
target = target_table,
source = source_table,
keys = keys_col_list,
apply_as_deletes = expr("operation = 'DELETE'"),
sequence_by = col("ts_ms"),
ignore_null_updates = False,
except_column_list = ["operation", "timestamp_ms"],
stored_as_scd_type = "1"
)
return
Полный пример см. в Поваренной книге DLT .