Как выполнить оператор слияния Spark SQL для таблицы Iceberg в Databricks?

Я пытаюсь настроить Apache Iceberg в нашей среде Databricks, и при выполнении инструкции в Spark SQL возникает ошибка.

Этот код:

      CREATE TABLE iceberg.db.table (id bigint, data string) USING iceberg;

INSERT INTO iceberg.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');

INSERT INTO iceberg.db.table SELECT id, data FROM (select * from iceberg.db.table) t WHERE length(data) = 1;

MERGE INTO iceberg.db.table t USING (SELECT * FROM iceberg.db.table) u ON t.id = u.id
WHEN NOT MATCHED THEN INSERT *

Генерирует эту ошибку:

      Error in SQL statement: AnalysisException: MERGE destination only supports Delta sources.
Some(RelationV2[id#116L, data#117] iceberg.db.table

Я считаю, что корень проблемы в том, что это тоже ключевое слово для движка Delta Lake SQL. Насколько я могу судить, эта проблема связана с порядком, в котором Spark пытается выполнить план. MERGEзапускает дельта-правило, а затем выдает ошибку, потому что это не дельта-таблица. Я могу без проблем читать, добавлять и перезаписывать таблицы айсбергов.

Основной вопрос: как я могу заставить Spark распознавать это как запрос Iceberg, а не как Delta? Или можно вообще удалить правила SQL, связанные с дельтой?

Среда

Версия Spark: 3.0.1

Версия времени выполнения Databricks: 7.6

Конфиги айсберга

      spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=hadoop
spark.sql.catalog.iceberg.warehouse=BLOB_STORAGE_CONTAINER

Трассировки стека:

      com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.AnalysisException: MERGE destination only supports Delta sources.
Some(RelationV2[id#116L, data#117] iceberg.db.table
);
    at com.databricks.sql.transaction.tahoe.DeltaErrors$.notADeltaSourceException(DeltaErrors.scala:343)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMerge.apply(PreprocessTableMerge.scala:201)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge$$anonfun$apply$1.applyOrElse(PreprocessTableMergeEdge.scala:39)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge$$anonfun$apply$1.applyOrElse(PreprocessTableMergeEdge.scala:36)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:112)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:112)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:216)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:110)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:108)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge.apply(PreprocessTableMergeEdge.scala:36)
    at com.databricks.sql.transaction.tahoe.PreprocessTableMergeEdge.apply(PreprocessTableMergeEdge.scala:29)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:152)```

3 ответа

Я считаю, что ошибка здесь в том, что Databricks всегда вытесняет другие расширения, добавленные в сеанс Spark. Это означает, что вы не можете выполнить кодовый путь Iceberg, и когда-либо будут использоваться только расширения Databricks. Я хотел бы спросить вашего представителя Databricks, есть ли способ разрешить размещение расширений Iceberg в первую очередь или они могут рассмотреть возможность использования других реализаций слияния.

Не совсем то, что вы ищете, но Databricks позволяет преобразовать таблицу Iceberg на месте (без копирования данных) в таблицу Delta —

https://docs.databricks.com/delta/delta-utility.html#convert-iceberg-to-delta

Требуется DBR 10.4+

      -- Convert the Iceberg table in the path <path-to-table>.
CONVERT TO DELTA iceberg.`<path-to-table>`

-- Convert the Iceberg table in the path <path-to-table> without collecting statistics.
CONVERT TO DELTA iceberg.`<path-to-table>` NO STATISTICS

Затем запустите MERGE для таблицы Delta.

Если бы у Iceberg было такое же обновление от Iceberg до Delta на месте (я не уверен), это решило бы первоначальную проблему.

Для недельта-источников разрешены только операции INSERT. Операции DELETE и MERGE не допускаются.

Другие вопросы по тегам