Pyspark с Zeppelin: распределение файлов по узлам кластера в сравнении с SparkContext.addFile()

У меня есть библиотека, которую я создал, и которую я хочу сделать доступной для всех узлов кластера pyspark (1.6.3). Я запускаю тестовые программы на этом искровом кластере через Zeppelin (0.7.3).

Файлы, которые я хочу, находятся в репозитории github. Поэтому я клонировал этот репозиторий на все узлы кластера и создал скрипт через pssh, чтобы обновить их все одновременно. Таким образом, файлы существуют в заданном месте на каждом узле, и я хочу, чтобы они были доступны для каждого узла.

Я пробовал это

import sys
sys.path.insert(0, "/opt/repo/folder/")

from module import function
return_rdd = function(arguments)

Это привело к ошибке стека:

  File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 439, in loads
    return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'module'

Я нахожу эту ошибку необычной, поскольку она вызвана вызовом рассола. Код, по-видимому, загружает фрейм данных и разбивает его на части, но происходит сбой только тогда, когда вызывается другая функция в модуле для секционированной df, преобразованной в rdd. Я не уверен, где и почему здесь задействован маринованный колл; модуль pyscript не должен подвергаться травлению, поскольку рассматриваемые модули уже должны находиться в sys.path на каждом узле кластера.

С другой стороны, я смог заставить это работать

sc.addFile("/opt/repo/folder/module.py")
import sys
from pyspark import SparkFiles
sys.path.insert(0, SparkFiles.getRootDirectory())

from module import function
return_rdd = function(arguments)

Есть идеи, почему первый подход не работает?

0 ответов

Возможное решение:

sc.addFile("/opt/repo/folder/module.py")
import sys
from pyspark import SparkFiles
sys.path.insert(0, SparkFiles.getRootDirectory())

from module import function
return_rdd = function(arguments)

Это не работает в кластерном режиме

Другие вопросы по тегам