Как установить точность и масштаб десятичного типа возвращаемого значения в Spark UDF?

Вот мой пример кода. Я ожидаю, что тип возвращаемого значения из UDF будет десятичным (16,4), но он является десятичным (38,18).

Есть ли лучшее решение?

Я НЕ жду ответа "cast (price as decimal(16,4))", так как в моем UDF есть другая бизнес-логика, кроме простого преобразования.

Заранее спасибо.

import scala.util.Try
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.Decimal
val spark = SparkSession.builder().master("local[*]").appName("Test").getOrCreate()
import spark.implicits._

val stringToDecimal = udf((s:String, precision:Int, scale: Int) => {
  Try(Decimal(BigDecimal(s), precision, scale)).toOption
})

spark.udf.register("stringToDecimal", stringToDecimal)

val inDf = Seq(
  ("1", "864.412"),
  ("2", "1.600"),
  ("3", "2,56")).toDF("id", "price")

val outDf = inDf.selectExpr("id", "stringToDecimal(price, 16, 4) as price")
outDf.printSchema()
outDf.show()

------------------output----------------
root
  |-- id: string (nullable = true)
  |-- price: decimal(38,18) (nullable = true)

+---+--------------------+
| id|               price|
+---+--------------------+
|  1|864.4120000000000...|
|  2|1.600000000000000000|
|  3|                null|
+---+--------------------+

3 ответа

Решение

Партнеры Spark Decimal с участием decimal(38, 18). Вам нужно явное приведение

$"price".cast(DataTypes.createDecimalType(32,2))

Что касается Spark 3.0 и ниже, вы не можете установить точность и масштаб в десятичном формате, возвращаемом пользовательской функцией Spark (UDF), поскольку точность и масштаб стираются при создании UDF.

Объяснение

Чтобы создать UDF, либо вызвав функцию udf с лямбда / функцией в качестве аргумента или путем прямой регистрации лямбда / функции как UDF с помощью sparkSession.udf.register, Spark необходимо преобразовать типы аргументов и вернуть тип лямбда / функции в DataType Spark.

Для этого Spark будет использовать метод schemaForв классе ScalaReflection для сопоставления типов scala с DataType Spark.

Для BigDecimal или Decimal типа, отображение выполняется следующим образом:

       case t if isSubtype(t, localTypeOf[BigDecimal]) =>
  Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if isSubtype(t, localTypeOf[java.math.BigDecimal]) =>
  Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if isSubtype(t, localTypeOf[Decimal]) =>
  Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)

Это означает, что когда ваша лямбда / функция возвращает либо BigDecimal или Decimal, тип возврата UDF будет DecimalType.SYSTEM_DEFAULT. DecimalType.SYSTEM_DEFAULT тип это Decimal с точностью до 38 и шкалой 18:

       val MAX_PRECISION = 38
...
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 18)

Заключение

Таким образом, каждый раз, когда вы трансформируете лямбду или функцию, возвращающую Decimal или BigDecimal в UDF Spark точность и масштаб стираются с точностью по умолчанию 38 и масштабом 18.

Итак, ваш единственный способ - следовать предыдущему ответу и передавать возвращаемое значение UDF при его вызове

Для использования pyspark:

      from pysprak.sql.types import DecimalType
def your_func(value):
    ...
spark.udf.register("your_func", your_func, DecimalType(precision=25, scale=10))
Другие вопросы по тегам