Схема Spark из класса case с правильной обнуляемостью
Для пользовательского метода transformSchema оценщика мне нужно уметь сравнивать схему входного фрейма данных со схемой, определенной в классе наблюдения. Обычно это может быть выполнено как Генерация Spark StructType / Schema из класса наблюдения, как описано ниже. Однако используется неправильная обнуляемость:
Реальная схема ДФ, выведенная spark.read.csv().as[MyClass]
может выглядеть так:
root
|-- CUSTOMER_ID: integer (nullable = false)
И случай дела:
case class MySchema(CUSTOMER_ID: Int)
Для сравнения я использую:
val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
if (!rawSchema.equals(rawDf.schema))
К сожалению, это всегда дает false
поскольку новая схема, выведенная вручную из класса case, устанавливает для Nullable значение true
(потому что ja java.Integer на самом деле может быть нулевым)
root
|-- CUSTOMER_ID: integer (nullable = true)
Как я могу указать nullable = false
при создании схемы?
1 ответ
Возможно, вы смешиваете вещи, которые на самом деле не принадлежат одному пространству. ML Pipelines по своей природе динамичны, и введение статически типизированных объектов на самом деле не меняет этого.
Кроме того схема для класса определяется как:
case class MySchema(CUSTOMER_ID: Int)
будет не обнуляемым CUSTOMER_ID
, scala.Int
это не то же самое, что java.lang.Integer
:
scala> import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
scala> case class MySchema(CUSTOMER_ID: Int)
defined class MySchema
scala> schemaFor[MySchema].dataType
res0: org.apache.spark.sql.types.DataType = StructType(StructField(CUSTOMER_ID,IntegerType,false))
Это, как говорится, если вы хотите nullable
поля Option[Int]
:
case class MySchema(CUSTOMER_ID: Option[Int])
и если вы хотите, чтобы обнулять использовать Int
как указано выше.
Еще одна проблема, с которой вы столкнулись csv
по определению каждое поле обнуляется, и это состояние "наследуется" закодированным Dataset
, Итак, на практике:
spark.read.csv(...)
всегда приведет к:
root
|-- CUSTOMER_ID: integer (nullable = true)
и именно поэтому вы получаете несоответствие схемы. К сожалению, невозможно переопределить nullable
поле для источников, которые не обеспечивают ограничения обнуляемости, такие как csv
или же json
,
Если наличие необнуляемой схемы является сложным требованием, вы можете попробовать:
spark.createDataFrame(
spark.read.csv(...).rdd,
schemaFor[MySchema].dataType.asInstanceOf[StructType]
).as[MySchema]
Этот подход действителен, только если вы знаете, что данные на самом деле null
свободно. любой null
значение приведет к исключению времени выполнения.