Как установить точность и масштаб десятичного типа возвращаемого значения в Spark UDF?
Вот мой пример кода. Я ожидаю, что тип возвращаемого значения из UDF будет десятичным (16,4), но он является десятичным (38,18).
Есть ли лучшее решение?
Я НЕ жду ответа "cast (price as decimal(16,4))", так как в моем UDF есть другая бизнес-логика, кроме простого преобразования.
Заранее спасибо.
import scala.util.Try
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.Decimal
val spark = SparkSession.builder().master("local[*]").appName("Test").getOrCreate()
import spark.implicits._
val stringToDecimal = udf((s:String, precision:Int, scale: Int) => {
Try(Decimal(BigDecimal(s), precision, scale)).toOption
})
spark.udf.register("stringToDecimal", stringToDecimal)
val inDf = Seq(
("1", "864.412"),
("2", "1.600"),
("3", "2,56")).toDF("id", "price")
val outDf = inDf.selectExpr("id", "stringToDecimal(price, 16, 4) as price")
outDf.printSchema()
outDf.show()
------------------output----------------
root
|-- id: string (nullable = true)
|-- price: decimal(38,18) (nullable = true)
+---+--------------------+
| id| price|
+---+--------------------+
| 1|864.4120000000000...|
| 2|1.600000000000000000|
| 3| null|
+---+--------------------+
3 ответа
Партнеры Spark Decimal
с участием decimal(38, 18)
. Вам нужно явное приведение
$"price".cast(DataTypes.createDecimalType(32,2))
Что касается Spark 3.0 и ниже, вы не можете установить точность и масштаб в десятичном формате, возвращаемом пользовательской функцией Spark (UDF), поскольку точность и масштаб стираются при создании UDF.
Объяснение
Чтобы создать UDF, либо вызвав функцию
udf
с лямбда / функцией в качестве аргумента или путем прямой регистрации лямбда / функции как UDF с помощью
sparkSession.udf.register
, Spark необходимо преобразовать типы аргументов и вернуть тип лямбда / функции в DataType Spark.
Для этого Spark будет использовать метод
schemaFor
в классе ScalaReflection для сопоставления типов scala с DataType Spark.
Для
BigDecimal
или
Decimal
типа, отображение выполняется следующим образом:
case t if isSubtype(t, localTypeOf[BigDecimal]) =>
Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if isSubtype(t, localTypeOf[java.math.BigDecimal]) =>
Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if isSubtype(t, localTypeOf[Decimal]) =>
Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
Это означает, что когда ваша лямбда / функция возвращает либо
BigDecimal
или
Decimal
, тип возврата UDF будет DecimalType.SYSTEM_DEFAULT.
DecimalType.SYSTEM_DEFAULT
тип это
Decimal
с точностью до 38 и шкалой 18:
val MAX_PRECISION = 38
...
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 18)
Заключение
Таким образом, каждый раз, когда вы трансформируете лямбду или функцию, возвращающую
Decimal
или
BigDecimal
в UDF Spark точность и масштаб стираются с точностью по умолчанию 38 и масштабом 18.
Итак, ваш единственный способ - следовать предыдущему ответу и передавать возвращаемое значение UDF при его вызове
Для использования pyspark:
from pysprak.sql.types import DecimalType
def your_func(value):
...
spark.udf.register("your_func", your_func, DecimalType(precision=25, scale=10))