Схема Spark из класса case с правильной обнуляемостью

Для пользовательского метода transformSchema оценщика мне нужно уметь сравнивать схему входного фрейма данных со схемой, определенной в классе наблюдения. Обычно это может быть выполнено как Генерация Spark StructType / Schema из класса наблюдения, как описано ниже. Однако используется неправильная обнуляемость:

Реальная схема ДФ, выведенная spark.read.csv().as[MyClass] может выглядеть так:

root
 |-- CUSTOMER_ID: integer (nullable = false)

И случай дела:

case class MySchema(CUSTOMER_ID: Int)

Для сравнения я использую:

val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
  if (!rawSchema.equals(rawDf.schema))

К сожалению, это всегда дает falseпоскольку новая схема, выведенная вручную из класса case, устанавливает для Nullable значение true (потому что ja java.Integer на самом деле может быть нулевым)

root
 |-- CUSTOMER_ID: integer (nullable = true)

Как я могу указать nullable = false при создании схемы?

1 ответ

Решение

Возможно, вы смешиваете вещи, которые на самом деле не принадлежат одному пространству. ML Pipelines по своей природе динамичны, и введение статически типизированных объектов на самом деле не меняет этого.

Кроме того схема для класса определяется как:

case class MySchema(CUSTOMER_ID: Int)

будет не обнуляемым CUSTOMER_ID, scala.Int это не то же самое, что java.lang.Integer:

scala> import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor

scala> case class MySchema(CUSTOMER_ID: Int)
defined class MySchema

scala> schemaFor[MySchema].dataType
res0: org.apache.spark.sql.types.DataType = StructType(StructField(CUSTOMER_ID,IntegerType,false))

Это, как говорится, если вы хотите nullable поля Option[Int]:

case class MySchema(CUSTOMER_ID: Option[Int])

и если вы хотите, чтобы обнулять использовать Int как указано выше.

Еще одна проблема, с которой вы столкнулись csv по определению каждое поле обнуляется, и это состояние "наследуется" закодированным Dataset, Итак, на практике:

spark.read.csv(...)

всегда приведет к:

root
 |-- CUSTOMER_ID: integer (nullable = true)

и именно поэтому вы получаете несоответствие схемы. К сожалению, невозможно переопределить nullable поле для источников, которые не обеспечивают ограничения обнуляемости, такие как csv или же json,

Если наличие необнуляемой схемы является сложным требованием, вы можете попробовать:

spark.createDataFrame(
  spark.read.csv(...).rdd,
  schemaFor[MySchema].dataType.asInstanceOf[StructType]
).as[MySchema]

Этот подход действителен, только если вы знаете, что данные на самом деле null свободно. любой null значение приведет к исключению времени выполнения.

Другие вопросы по тегам