Используя maxmind geoip в сериализованной искре
Я пытаюсь использовать MaxMind GeoIP API для scala-spark, который находится по https://github.com/snowplow/scala-maxmind-iplookups. Я загружаю в файл, используя стандартные:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
У меня есть базовый CSV-файл, который я загружаю, который содержит время и IP-адреса:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
Функция ipdetect в основном определяется следующим образом:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
Когда я запускаю эту программу, появляется сообщение "Задача не сериализуема". Поэтому я прочитал несколько постов, и, кажется, есть несколько способов обойти это.
1, обертка2, используя SparkContext.addFile (который распределяет файл по кластеру)
но я не могу понять, как работает какой-либо из них, я попробовал обертку, но я не знаю, как и где это назвать. Я попробовал addFile, но он возвращает Unit вместо String, который, как я полагаю, вам понадобится каким-то образом передать в двоичный файл. Поэтому я не уверен, что делать сейчас. Любая помощь высоко ценится
Таким образом, я смог несколько сериализовать его с помощью mapPartitions и выполнять итерации по каждому локальному разделу, но мне интересно, есть ли более эффективный способ сделать это, поскольку у меня есть набор данных в диапазоне миллионов
1 ответ
Предположим, что ваш CSV-файл содержит IP-адрес в каждой строке, и, например, вы хотите сопоставить каждый IP-адрес с городом.
import com.snowplowanalytics.maxmind.iplookups.IpLookups
val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)
def parseIP(ip:String, ipLookups: IpLookups): String = {
val lookupResult = ipLookups.performLookups(ip)
val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}
val logs = sc.textFile("path/to/your.csv")
.mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
Для других IP-преобразований, пожалуйста, обратитесь к Scala MaxMind IP Lookups. Более того, mapWith
кажется устаревшим. использование mapPartitionsWithIndex
вместо.