Как определить схему для пользовательского типа в Spark SQL?

Следующий пример кода пытается поместить некоторые объекты case в фрейм данных. Код включает в себя определение иерархии объектов case и класса case, использующего эту черту:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

При выполнении кода я, к сожалению, сталкиваюсь со следующим исключением:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

Вопросы

  • Есть ли возможность добавить или определить схему для определенных типов (здесь тип Some)?
  • Существует ли другой подход для представления такого рода перечислений?
    • Я пытался использовать Enumeration напрямую, но тоже безуспешно. (увидеть ниже)

Код для Enumeration:

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

Заранее спасибо. Я надеюсь, что лучший подход - не использовать вместо этого строки.

1 ответ

Решение

Spark 2.0.0+:

UserDefinedType был сделан приватным в Spark 2.0.0 и на данный момент не имеет Dataset дружеская замена.

См.: SPARK-14155 (Скрыть UserDefinedType в Spark 2.0)

Большую часть времени статически набирается Dataset может служить заменой Существует ожидающая версия Jira SPARK-7768, которая снова сделает UDT API общедоступным с целевой версией 2.4.

Смотрите также Как хранить пользовательские объекты в наборе данных?

Искра < 2.0.0

Есть ли возможность добавить или определить схему для определенных типов (здесь типа Некоторые)?

Я думаю, ответ зависит от того, насколько сильно вам это нужно. Похоже, что можно создать UserDefinedType но это требует доступа к DeveloperApi и не совсем однозначно или хорошо документировано.

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

Вы, вероятно, должны переопределить hashCode а также equals также.

Его аналог PySpark может выглядеть так:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

В Spark < 1.5 Python UDT требует сопряженного Scala UDT, но, похоже, в 1.5 он больше не работает.

Для простого UDT, как вы можете использовать простые типы (например, IntegerType вместо целого Struct).

Другие вопросы по тегам