Как выполнить инициализацию в спарк?
Я хочу выполнить geoip-поиск моих данных в spark. Для этого я использую базу данных MaxMind GeoIP.
Я хочу инициализировать объект базы данных geoip один раз в каждом разделе, а затем использовать его для поиска города, связанного с IP-адресом.
Есть ли в spark фаза инициализации для каждого узла, или я должен вместо этого проверить, является ли переменная экземпляра неопределенной, и если да, инициализировать ее перед продолжением? Например, что-то вроде (это Python, но я хочу решение Scala):
class IPLookup(object):
database = None
def getCity(self, ip):
if not database:
self.database = self.initialise(geoipPath)
...
Конечно, для этого требуется искра, которая сериализует весь объект, что предостерегает документация.
3 ответа
Это похоже на хорошее использование широковещательной переменной. Вы смотрели на документацию для этой функциональности, и если у вас есть, она каким-то образом не соответствует вашим требованиям?
В Spark для каждого раздела операции можно выполнить с помощью:
def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)
Этот картограф выполнит функцию f
один раз за раздел по итератору элементов. Идея состоит в том, что стоимость установки ресурсов (например, соединений с БД) будет компенсирована использованием таких ресурсов по ряду элементов в итераторе.
Пример:
val logsRDD = ???
logsRDD.mapPartitions{iter =>
val geoIp = new GeoIPLookupDB(...)
// this is local map over the iterator - do not confuse with rdd.map
iter.map(elem => (geoIp.resolve(elem.ip),elem))
}
Как уже упоминалось @bearrito - вы можете использовать загрузку GeoDB, а затем транслировать ее из вашего драйвера. Другим вариантом, который стоит рассмотреть, является предоставление внешнего сервиса, который вы можете использовать для поиска. Это может быть кэш в памяти, такой как Redis/Memcached/Tacheyon или обычное хранилище данных.