Spark UDF с данными Maxmind Geo
Я пытаюсь использовать библиотеку снегоочистителя Maxmind для извлечения географических данных по каждому IP, который у меня есть в кадре данных.
Мы используем Spark SQL (spark версии 2.1.0), и я создал UDF в следующем классе:
class UdfDefinitions @Inject() extends Serializable with StrictLogging {
sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
val s3Config = configuration.databases.dataWarehouse.s3
val lruCacheConst = 20000
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)
def lookupIP(ip: String): LookupIPResult = {
val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
loc match {
case None => LookupIPResult("", "", "")
case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""),
x.city.getOrElse(""), x.regionName.getOrElse(""))
}
}
val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)
}
Цель состоит в том, чтобы создать указатель на файл (ipLookups) вне UDF и использовать его внутри, чтобы не открывать файлы в каждой строке. Это приводит к ошибке сериализации задачи, и когда мы используем addFiles в UDF, мы получаем ошибку открытия слишком большого количества файлов (при использовании большого набора данных для небольшого набора данных это работает).
Этот поток показывает, как использовать для решения проблемы с помощью RDD, но мы хотели бы использовать Spark SQL. используя maxmind geoip в сериализованной искре
Какие-нибудь мысли? Спасибо
1 ответ
Проблема здесь в том, что IpLookups не сериализуем. Тем не менее, он делает поиск из статического файла (от того, что я собрал), так что вы должны быть в состоянии это исправить. Я бы посоветовал вам клонировать репо и сделать IpLookups Serializable. Затем, чтобы заставить его работать с искровым SQL, оберните все в класс, как вы сделали. В основной искровой работе вы можете написать что-то следующее:
val IPResolver = new MySerializableIpResolver()
val resolveIP = udf((ip : String) => IPResolver.resolve(ip))
data.withColumn("Result", resolveIP($"IP"))
Если у вас не так много разных IP-адресов, есть другое решение: вы можете сделать все в драйвере.
val ipMap = data.select("IP").distinct.collect
.map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/)
.toMap
val resolveIP = udf((ip : String) => ipMap(ip))
data.withColumn("Result", resolveIP($"IP"))