Создайте Spark StructType / Schema из класса дел
Если бы я хотел создать StructType
(т.е. DataFrame.schema
) из case class
Есть ли способ сделать это без создания DataFrame
? Я легко могу сделать:
case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema
Но, кажется, излишне создавать DataFrame
когда все, что я хочу, это схема.
(Если вам интересно, причина вопроса в том, что я определяю UserDefinedAggregateFunction
и для этого вы переопределяете несколько методов, которые возвращают StructTypes
и я использую тематические классы.)
3 ответа
Вы можете сделать это так же SQLContext.createDataFrame
Является ли:
import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[TestCase].dataType.asInstanceOf[StructType]
Я знаю, что этому вопросу уже почти год, но я натолкнулся на него и подумал, что другие, которые тоже могут узнать, что я только что научился использовать этот подход:
import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema
В случае, если кто-то хочет сделать это для пользовательского компонента Java:
ExpressionEncoder.javaBean(Event.class).schema().json()
Вместо того, чтобы вручную воспроизводить логику для создания неявного Encoder
объект, который передается toDF
можно использовать это непосредственно (или, точнее, неявно так же, как toDF
):
// spark: SparkSession
import spark.implicits._
implicitly[Encoder[MyCaseClass]].schema
К сожалению, это на самом деле страдает от той же проблемы, что и использование org.apache.spark.sql.catalyst
или же Encoders
как и в других ответах: Encoder
черта экспериментальная.
Как это работает? toDF
метод на Seq
исходит от DatasetHolder
, который создается через неявный localSeqToDatasetHolder
импортируется через spark.implicits._
, Эта функция определяется как:
implicit def localSeqToDatasetHolder[T](s: Seq[T])(implicit arg0: Encoder[T]): DatasetHolder[T]
Как вы можете видеть, это занимает implicit
Encoder[T]
аргумент, который, для case class
, может быть вычислено с помощью newProductEncoder
(также импортируется через spark.implicits._
). Мы можем воспроизвести эту неявную логику, чтобы получить Encoder
для нашего случая класса, через удобство scala.Predef.implicitly
(в объеме по умолчанию, потому что это из Predef
) который просто возвращает запрошенный неявный аргумент:
def implicitly[T](implicit e: T): T