Используя maxmind geoip в сериализованной искре

Я пытаюсь использовать MaxMind GeoIP API для scala-spark, который находится по https://github.com/snowplow/scala-maxmind-iplookups. Я загружаю в файл, используя стандартные:

val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)

У меня есть базовый CSV-файл, который я загружаю, который содержит время и IP-адреса:

val sweek1 = week1.map{line=> IP(parse(line))}.collect{
  case Some(ip) => {
    val ipadress = ipdetect(ip.ip)
    (ip.time, ipadress)
    }
}

Функция ipdetect в основном определяется следующим образом:

def ipdetect(a:String)={
  ipLookups.performLookups(a)._1 match{
    case Some(value) => value.toString
    case _ => "Unknown"
  }
}

Когда я запускаю эту программу, появляется сообщение "Задача не сериализуема". Поэтому я прочитал несколько постов, и, кажется, есть несколько способов обойти это.

1, обертка2, используя SparkContext.addFile (который распределяет файл по кластеру)

но я не могу понять, как работает какой-либо из них, я попробовал обертку, но я не знаю, как и где это назвать. Я попробовал addFile, но он возвращает Unit вместо String, который, как я полагаю, вам понадобится каким-то образом передать в двоичный файл. Поэтому я не уверен, что делать сейчас. Любая помощь высоко ценится

Таким образом, я смог несколько сериализовать его с помощью mapPartitions и выполнять итерации по каждому локальному разделу, но мне интересно, есть ли более эффективный способ сделать это, поскольку у меня есть набор данных в диапазоне миллионов

1 ответ

Решение

Предположим, что ваш CSV-файл содержит IP-адрес в каждой строке, и, например, вы хотите сопоставить каждый IP-адрес с городом.

import com.snowplowanalytics.maxmind.iplookups.IpLookups

val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)

def parseIP(ip:String, ipLookups: IpLookups): String = {
  val lookupResult = ipLookups.performLookups(ip)
  val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}

val logs = sc.textFile("path/to/your.csv")
             .mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)

Для других IP-преобразований, пожалуйста, обратитесь к Scala MaxMind IP Lookups. Более того, mapWith кажется устаревшим. использование mapPartitionsWithIndex вместо.

Другие вопросы по тегам