Использование функции самоопределения данных в Spark Structured Stream
Я прочитал следующий блог и нашел API очень полезным.
В блоге есть много примеров выбора данных. Как с использованием ввода
{
"a": {
"b": 1
}
}
Применять Scala: events.select("a.b")
, результат будет
{
"b": 1
}
Но преобразование типов данных не упоминается в блоге. Сказав, у меня есть следующий вход:
{
"timestampInSec": "1514917353",
"ip": "123.39.76.112",
"money": "USD256",
"countInString": "6"
}
Ожидаемый результат:
{
"timestamp": "2018-01-02 11:22:33",
"ip_long": 2066173040,
"currency": "USD",
"money_amount": 256,
"count": 6
}
Есть некоторые преобразования, которые не включены в org.apache.spark.sql.functions._
:
- Отметка времени находится во втором и является строковым типом
- Конвертировать IP в длинный
- Трещина
USD256
в два столбца и преобразовать один столбец в число - Преобразовать строку в число
Другое дело обработка ошибок и значение по умолчанию. Если есть неправильный ввод, такой как:
{
"timestampInSec": "N/A",
"money": "999",
"countInString": "Number-Six"
}
Ожидается, что выход может быть
{
"timestamp": "1970-01-01 00:00:00",
"ip_long": 0,
"currency": "NA",
"money_amount": 999,
"count": -1
}
- вход
timestampInSec
это не число. Ожидается использование 0 и создание строки метки времени в качестве возвращаемого значения. ip
отсутствует на входе. Ожидается использование значения по умолчанию 0.money
поле не заполнено. У него есть сумма денег, но пропущенная валюта. Ожидается использоватьNA
в качестве валюты по умолчанию и правильно перевестиmoney_amount
countInString
это не число. Ожидается использовать-1
(не 0) в качестве значения по умолчанию.
Эти требования не являются общими и нуждаются в некотором самоопределенном коде бизнес-логики.
Я проверил некоторые функции, такие как to_timestamp
, Есть кое-что кодовое и кажется не очень легко добавлять новые функции. Есть ли какое-нибудь руководство / документ по написанию самоопределяемой функции преобразования? Есть ли простой способ удовлетворить требования?
1 ответ
Для всех:
import org.apache.spark.sql.functions._
Отметка времени находится во втором и является строковым типом
val timestamp = coalesce( $"timestampInSec".cast("long").cast("timestamp"), lit(0).cast("timestamp") ).alias("timestamp")
Разделите USD256 на два столбца и конвертируйте один столбец в число
val currencyPattern = "^([A-Z]+)?([0-9]+)$" val currency = (trim(regexp_extract($"money", currencyPattern, 1)) match { case c => when(length(c) === 0, "NA").otherwise(c) }).alias("currency") val amount = regexp_extract($"money", currencyPattern, 2) .cast("decimal(38, 0)").alias("money_amount")
Преобразовать строку в число
val count = coalesce($"countInString".cast("long"), lit(-1)).alias("count")
Конвертировать IP в длинный
val ipPattern = "^([0-9]{1,3})\\.([0-9]{1,3})\\.([0-9]{1,3})\\.([0-9]{1,3})" val ip_long = coalesce(Seq((1, 24), (2, 16), (3, 8), (4, 0)).map { case (group, numBits) => shiftLeft( regexp_extract($"ip", ipPattern, group).cast("long"), numBits ) }.reduce(_ + _), lit(0)).alias("ip_long")
Результат
val df = Seq(
("1514917353", "123.39.76.112", "USD256", "6"),
("N/A", null, "999", null)
).toDF("timestampInSec", "ip", "money", "countInString")
df.select(timestamp, currency, amount, count, ip_long).show
// +-------------------+--------+------------+-----+----------+
// | timestamp|currency|money_amount|count| ip_long|
// +-------------------+--------+------------+-----+----------+
// |2018-01-02 18:22:33| USD| 256| 6|2066173040|
// |1970-01-01 00:00:00| NA| 999| -1| 0|
// +-------------------+--------+------------+-----+----------+