Синхронизация таблицы Qubole HIve со снежинкой с полем Struct

У меня есть таблица, как следующий Qubole:

use dm;

CREATE EXTERNAL TABLE IF NOT EXISTS fact (
    id string,
    fact_attr struct<
        attr1 : String,
        attr2 : String
    >
)
STORED AS PARQUET
LOCATION 's3://my-bucket/DM/fact'

Я создал параллельную таблицу в Snowflake следующим образом:

CREATE TABLE IF NOT EXISTS dm.fact (
    id string,
    fact_attr variant
)

Мой процесс ETL загружает данные в таблицу qubole, например:

+------------+--------------------------------+
| id         | fact_attr                      |
+------------+--------------------------------+
| 1          | {"attr1": "a1", "attr2": "a2"} |
| 2          | {"attr1": "a3", "attr2": null} |
+------------+--------------------------------+

Я пытаюсь синхронизировать эти данные в снежинку с помощью команды Merge, например

MERGE INTO DM.FACT dst USING %s src 
    ON dst.id = src.id
WHEN MATCHED THEN UPDATE SET
    fact_attr = parse_json(src.fact_attr)
WHEN NOT MATCHED THEN INSERT (
    id,
    fact_attr
) VALUES (
    src.id,
    parse_json(src.fact_attr)
);

Я использую PySpark для синхронизации данных:

df.write \
  .option("sfWarehouse", sf_warehouse) \
  .option("sfDatabase", sf_database) \
  .option("sfSchema", sf_schema) \
  .option("postactions", query) \
  .mode("overwrite") \
  .snowflake("snowflake", sf_warehouse, sf_temp_table)

С вышеприведенной командой я получаю следующую ошибку:

pyspark.sql.utils.IllegalArgumentException: u"Don't know how to save StructField(fact_attr,StructType(StructField(attr1,StringType,true), StructField(attr2,StringType,true)),true) of type attributes to Snowflake"

Я прочитал следующие ссылки, но безуспешно:

Вопрос:

Как я могу вставить / синхронизировать данные из таблицы Qubole Hive с полем STRUCT для снежинки?

0 ответов

Версия вашего Spark Connector for Snowflake, используемая во время попытки, не поддерживала вариантные типы данных.

Поддержка была представлена ​​в их версии соединителя 2.4.4 (выпущенной в июле 2018 г.), где поля StructType теперь автоматически сопоставляются с типом данных VARIANT, который будет работать с вашей командой MERGE.