Объединение схемы Spark без дубликатов?

Для обработки имеющихся у меня данных я извлекаю схему раньше, поэтому, когда я читаю набор данных, я предоставляю схему вместо того, чтобы выполнять дорогостоящий этап вывода схемы.

Чтобы построить схему, мне нужно объединить несколько различных схем в окончательную схему, поэтому я использовал union (++) а также distinct методы, но я продолжаю получать org.apache.spark.sql.AnalysisException: Duplicate column(s) исключение.

Например, скажем, у нас есть две схемы в следующей структуре:

val schema1 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema2 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema3 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) ::
    StructField("ii", StringType, true) :: Nil
    ), true) :: Nil)

val final_schema = (schema1 ++ schema2 ++ schema3).distinct

println(final_schema)

какие выводы:

StructType(
    StructField(A,StructType(
         StructField(i,StringType,true)),true), 
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

Я понимаю, что только структура схемы, которая точно соответствует другой схеме, будет отфильтрована distinct, Однако я хочу, чтобы результат выглядел так:

StructType(
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

в котором все становится "объединенным" в одну схему. Я перебрал все методы в документации scala, но я не могу найти правильный метод для решения этой проблемы. Есть идеи?

РЕДАКТИРОВАТЬ:

Конечной целью будет кормить final_schema в sqlContext.read.schema и читать RDD строк JSON, используя read метод.

1 ответ

Решение

Попробуйте что-то вроде этого:

(schema1 ++ schema2 ++ schema3).groupBy(getKey).map(_._2.head)

где getKey это функция, которая переходит от схемы к свойствам, которые вы хотите учитывать при объединении (например, имя столбца или имя подполей). в map Функция, которую вы можете взять на себя или использовать более сложную функцию, чтобы сохранить конкретную схему.

Spark с помощью Scala:

val consolidatedSchema = test1Df.schema.++:(test2Df.schema).toSet
val uniqueConsolidatedSchemas = StructType(consolidatedSchema.toSeq)

Spark с Java:

StructType consolidatedSchema = test1Df.schema().merge(test2Df.schema());
Другие вопросы по тегам