Как выполнить инициализацию в спарк?

Я хочу выполнить geoip-поиск моих данных в spark. Для этого я использую базу данных MaxMind GeoIP.

Я хочу инициализировать объект базы данных geoip один раз в каждом разделе, а затем использовать его для поиска города, связанного с IP-адресом.

Есть ли в spark фаза инициализации для каждого узла, или я должен вместо этого проверить, является ли переменная экземпляра неопределенной, и если да, инициализировать ее перед продолжением? Например, что-то вроде (это Python, но я хочу решение Scala):

class IPLookup(object):
    database = None

    def getCity(self, ip):
      if not database:
        self.database = self.initialise(geoipPath)
  ...

Конечно, для этого требуется искра, которая сериализует весь объект, что предостерегает документация.

3 ответа

Решение

Это похоже на хорошее использование широковещательной переменной. Вы смотрели на документацию для этой функциональности, и если у вас есть, она каким-то образом не соответствует вашим требованиям?

В Spark для каждого раздела операции можно выполнить с помощью:

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)

Этот картограф выполнит функцию f один раз за раздел по итератору элементов. Идея состоит в том, что стоимость установки ресурсов (например, соединений с БД) будет компенсирована использованием таких ресурсов по ряду элементов в итераторе.

Пример:

val logsRDD = ???
logsRDD.mapPartitions{iter =>
   val geoIp = new GeoIPLookupDB(...)
   // this is local map over the iterator - do not confuse with rdd.map
   iter.map(elem => (geoIp.resolve(elem.ip),elem)) 
}

Как уже упоминалось @bearrito - вы можете использовать загрузку GeoDB, а затем транслировать ее из вашего драйвера. Другим вариантом, который стоит рассмотреть, является предоставление внешнего сервиса, который вы можете использовать для поиска. Это может быть кэш в памяти, такой как Redis/Memcached/Tacheyon или обычное хранилище данных.

Другие вопросы по тегам