Spark UDF с данными Maxmind Geo

Я пытаюсь использовать библиотеку снегоочистителя Maxmind для извлечения географических данных по каждому IP, который у меня есть в кадре данных.

Мы используем Spark SQL (spark версии 2.1.0), и я создал UDF в следующем классе:

class UdfDefinitions @Inject() extends Serializable with StrictLogging {

 sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
 val s3Config = configuration.databases.dataWarehouse.s3
 val lruCacheConst = 20000
 val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
  ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)

 def lookupIP(ip: String): LookupIPResult = {
  val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
  loc match {
    case None => LookupIPResult("", "", "")
    case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""), 
   x.city.getOrElse(""), x.regionName.getOrElse(""))
   }
 }

 val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)

}

Цель состоит в том, чтобы создать указатель на файл (ipLookups) вне UDF и использовать его внутри, чтобы не открывать файлы в каждой строке. Это приводит к ошибке сериализации задачи, и когда мы используем addFiles в UDF, мы получаем ошибку открытия слишком большого количества файлов (при использовании большого набора данных для небольшого набора данных это работает).

Этот поток показывает, как использовать для решения проблемы с помощью RDD, но мы хотели бы использовать Spark SQL. используя maxmind geoip в сериализованной искре

Какие-нибудь мысли? Спасибо

1 ответ

Проблема здесь в том, что IpLookups не сериализуем. Тем не менее, он делает поиск из статического файла (от того, что я собрал), так что вы должны быть в состоянии это исправить. Я бы посоветовал вам клонировать репо и сделать IpLookups Serializable. Затем, чтобы заставить его работать с искровым SQL, оберните все в класс, как вы сделали. В основной искровой работе вы можете написать что-то следующее:

val IPResolver = new MySerializableIpResolver()
val resolveIP = udf((ip : String) => IPResolver.resolve(ip))
data.withColumn("Result", resolveIP($"IP"))

Если у вас не так много разных IP-адресов, есть другое решение: вы можете сделать все в драйвере.

val ipMap = data.select("IP").distinct.collect
    .map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/)
    .toMap
val resolveIP = udf((ip : String) => ipMap(ip))
data.withColumn("Result", resolveIP($"IP"))
Другие вопросы по тегам