Использование функции самоопределения данных в Spark Structured Stream

Я прочитал следующий блог и нашел API очень полезным.

https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

В блоге есть много примеров выбора данных. Как с использованием ввода

{
  "a": {
     "b": 1
  }
}

Применять Scala: events.select("a.b"), результат будет

{
  "b": 1
}

Но преобразование типов данных не упоминается в блоге. Сказав, у меня есть следующий вход:

{
  "timestampInSec": "1514917353",
  "ip": "123.39.76.112",
  "money": "USD256",
  "countInString": "6"
}

Ожидаемый результат:

{
  "timestamp": "2018-01-02 11:22:33",
  "ip_long": 2066173040,
  "currency": "USD",
  "money_amount": 256,
  "count": 6
}

Есть некоторые преобразования, которые не включены в org.apache.spark.sql.functions._:

  • Отметка времени находится во втором и является строковым типом
  • Конвертировать IP в длинный
  • Трещина USD256 в два столбца и преобразовать один столбец в число
  • Преобразовать строку в число

Другое дело обработка ошибок и значение по умолчанию. Если есть неправильный ввод, такой как:

{
  "timestampInSec": "N/A",
  "money": "999",
  "countInString": "Number-Six"
}

Ожидается, что выход может быть

{
  "timestamp": "1970-01-01 00:00:00",
  "ip_long": 0,
  "currency": "NA",
  "money_amount": 999,
  "count": -1
}
  • вход timestampInSec это не число. Ожидается использование 0 и создание строки метки времени в качестве возвращаемого значения.
  • ip отсутствует на входе. Ожидается использование значения по умолчанию 0.
  • money поле не заполнено. У него есть сумма денег, но пропущенная валюта. Ожидается использовать NA в качестве валюты по умолчанию и правильно перевести money_amount
  • countInString это не число. Ожидается использовать -1 (не 0) в качестве значения по умолчанию.

Эти требования не являются общими и нуждаются в некотором самоопределенном коде бизнес-логики.

Я проверил некоторые функции, такие как to_timestamp, Есть кое-что кодовое и кажется не очень легко добавлять новые функции. Есть ли какое-нибудь руководство / документ по написанию самоопределяемой функции преобразования? Есть ли простой способ удовлетворить требования?

1 ответ

Решение

Для всех:

import org.apache.spark.sql.functions._
  • Отметка времени находится во втором и является строковым типом

    val timestamp = coalesce(
       $"timestampInSec".cast("long").cast("timestamp"), 
       lit(0).cast("timestamp")
    ).alias("timestamp")
    
  • Разделите USD256 на два столбца и конвертируйте один столбец в число

    val currencyPattern = "^([A-Z]+)?([0-9]+)$"
    
    val currency = (trim(regexp_extract($"money", currencyPattern, 1)) match {
      case c => when(length(c) === 0, "NA").otherwise(c)
    }).alias("currency")
    val amount = regexp_extract($"money", currencyPattern, 2)
      .cast("decimal(38, 0)").alias("money_amount") 
    
  • Преобразовать строку в число

    val count = coalesce($"countInString".cast("long"), lit(-1)).alias("count")
    
  • Конвертировать IP в длинный

    val ipPattern = "^([0-9]{1,3})\\.([0-9]{1,3})\\.([0-9]{1,3})\\.([0-9]{1,3})"
    val ip_long = coalesce(Seq((1, 24), (2, 16), (3, 8), (4, 0)).map {
      case (group, numBits) => shiftLeft(
        regexp_extract($"ip", ipPattern, group).cast("long"),
        numBits
      )
    }.reduce(_ + _), lit(0)).alias("ip_long")
    

Результат

val df = Seq(
  ("1514917353", "123.39.76.112", "USD256", "6"),
  ("N/A", null, "999", null)
).toDF("timestampInSec", "ip", "money", "countInString")

df.select(timestamp, currency, amount, count, ip_long).show
// +-------------------+--------+------------+-----+----------+
// |          timestamp|currency|money_amount|count|   ip_long|
// +-------------------+--------+------------+-----+----------+
// |2018-01-02 18:22:33|     USD|         256|    6|2066173040|
// |1970-01-01 00:00:00|      NA|         999|   -1|         0|
// +-------------------+--------+------------+-----+----------+
Другие вопросы по тегам