Создайте Spark StructType / Schema из класса дел

Если бы я хотел создать StructType (т.е. DataFrame.schema) из case classЕсть ли способ сделать это без создания DataFrame? Я легко могу сделать:

case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema

Но, кажется, излишне создавать DataFrame когда все, что я хочу, это схема.

(Если вам интересно, причина вопроса в том, что я определяю UserDefinedAggregateFunctionи для этого вы переопределяете несколько методов, которые возвращают StructTypes и я использую тематические классы.)

3 ответа

Решение

Вы можете сделать это так же SQLContext.createDataFrame Является ли:

import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[TestCase].dataType.asInstanceOf[StructType]

Я знаю, что этому вопросу уже почти год, но я натолкнулся на него и подумал, что другие, которые тоже могут узнать, что я только что научился использовать этот подход:

import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema

В случае, если кто-то хочет сделать это для пользовательского компонента Java:

ExpressionEncoder.javaBean(Event.class).schema().json()

Вместо того, чтобы вручную воспроизводить логику для создания неявного Encoder объект, который передается toDF можно использовать это непосредственно (или, точнее, неявно так же, как toDF):

// spark: SparkSession

import spark.implicits._

implicitly[Encoder[MyCaseClass]].schema

К сожалению, это на самом деле страдает от той же проблемы, что и использование org.apache.spark.sql.catalyst или же Encoders как и в других ответах: Encoder черта экспериментальная.

Как это работает? toDF метод на Seq исходит от DatasetHolder, который создается через неявный localSeqToDatasetHolderимпортируется через spark.implicits._, Эта функция определяется как:

implicit def localSeqToDatasetHolder[T](s: Seq[T])(implicit arg0: Encoder[T]): DatasetHolder[T]

Как вы можете видеть, это занимает implicitEncoder[T] аргумент, который, для case class, может быть вычислено с помощью newProductEncoder (также импортируется через spark.implicits._). Мы можем воспроизвести эту неявную логику, чтобы получить Encoder для нашего случая класса, через удобство scala.Predef.implicitly (в объеме по умолчанию, потому что это из Predef) который просто возвращает запрошенный неявный аргумент:

def implicitly[T](implicit e: T): T
Другие вопросы по тегам