Объединение схемы Spark без дубликатов?
Для обработки имеющихся у меня данных я извлекаю схему раньше, поэтому, когда я читаю набор данных, я предоставляю схему вместо того, чтобы выполнять дорогостоящий этап вывода схемы.
Чтобы построить схему, мне нужно объединить несколько различных схем в окончательную схему, поэтому я использовал union (++)
а также distinct
методы, но я продолжаю получать org.apache.spark.sql.AnalysisException: Duplicate column(s)
исключение.
Например, скажем, у нас есть две схемы в следующей структуре:
val schema1 = StructType(StructField("A", StructType(
StructField("i", StringType, true) :: Nil
), true) :: Nil)
val schema2 = StructType(StructField("A", StructType(
StructField("i", StringType, true) :: Nil
), true) :: Nil)
val schema3 = StructType(StructField("A", StructType(
StructField("i", StringType, true) ::
StructField("ii", StringType, true) :: Nil
), true) :: Nil)
val final_schema = (schema1 ++ schema2 ++ schema3).distinct
println(final_schema)
какие выводы:
StructType(
StructField(A,StructType(
StructField(i,StringType,true)),true),
StructField(A,StructType(
StructField(i,StringType,true),
StructField(ii,StringType,true)),true))
Я понимаю, что только структура схемы, которая точно соответствует другой схеме, будет отфильтрована distinct
, Однако я хочу, чтобы результат выглядел так:
StructType(
StructField(A,StructType(
StructField(i,StringType,true),
StructField(ii,StringType,true)),true))
в котором все становится "объединенным" в одну схему. Я перебрал все методы в документации scala, но я не могу найти правильный метод для решения этой проблемы. Есть идеи?
РЕДАКТИРОВАТЬ:
Конечной целью будет кормить final_schema
в sqlContext.read.schema
и читать RDD строк JSON, используя read
метод.
1 ответ
Попробуйте что-то вроде этого:
(schema1 ++ schema2 ++ schema3).groupBy(getKey).map(_._2.head)
где getKey
это функция, которая переходит от схемы к свойствам, которые вы хотите учитывать при объединении (например, имя столбца или имя подполей). в map
Функция, которую вы можете взять на себя или использовать более сложную функцию, чтобы сохранить конкретную схему.
Spark с помощью Scala:
val consolidatedSchema = test1Df.schema.++:(test2Df.schema).toSet
val uniqueConsolidatedSchemas = StructType(consolidatedSchema.toSeq)
Spark с Java:
StructType consolidatedSchema = test1Df.schema().merge(test2Df.schema());