Pyspark --py-файлы не работают

Я использую это, как предлагает документ http://spark.apache.org/docs/1.1.1/submitting-applications.html

Spsark версия 1.1.0

./spark/bin/spark-submit --py-files /home/hadoop/loganalysis/parser-src.zip \
/home/hadoop/loganalysis/ship-test.py 

и конф в коде:

conf = (SparkConf()
        .setMaster("yarn-client")
        .setAppName("LogAnalysis")
        .set("spark.executor.memory", "1g")
        .set("spark.executor.cores", "4")
        .set("spark.executor.num", "2")
        .set("spark.driver.memory", "4g")
        .set("spark.kryoserializer.buffer.mb", "128"))

и подчиненный узел жалуются на ImportError

14/12/25 05:09:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main
    command = pickleSer._read_with_length(infile)
  File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length
    return self.loads(obj)
ImportError: No module named parser

и parser-src.zip протестирован локально.

[hadoop@ip-172-31-10-231 ~]$ python
Python 2.7.8 (default, Nov  3 2014, 10:17:30) 
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.path.insert(1, '/home/hadoop/loganalysis/parser-src.zip')
>>> from parser import parser
>>> parser.parse
<function parse at 0x7fa5ef4c9848>
>>> 

Я пытаюсь получить информацию об удаленном работнике. посмотрим, скопировал ли он файлы. как выглядит sys.path... и это сложно.

ОБНОВЛЕНИЕ: я использую это, обнаружил, что почтовый файл был отправлен. и sys.path был установлен. все еще импортировать получить ошибку.

data = list(range(4))
disdata = sc.parallelize(data)
result = disdata.map(lambda x: "sys.path:  {0}\nDIR: {1}   \n FILES: {2} \n parser: {3}".format(sys.path, os.getcwd(), os.listdir('.'), str(parser)))
result.collect()
print(result.take(4))

Кажется, мне нужно копаться в cloudpickle. Это означает, что мне нужно понять, как работает cloudpickle и вначале происходит сбой.

: An error occurred while calling o40.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 23, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main
    command = pickleSer._read_with_length(infile)
  File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length
    return self.loads(obj)
  File "/home/hadoop/spark/python/pyspark/cloudpickle.py", line 811, in subimport
    __import__(name)
ImportError: ('No module named parser', <function subimport at 0x7f219ffad7d0>, ('parser.parser',))

ОБНОВИТЬ:

кто-то сталкивается с той же проблемой в спарк 0,8 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Importing-other-py-files-in-PYTHONPATH-td2301.html

но он поместил свою библиотеку в дистрибутивы Python и импортирует работы. который я пробовал и до сих пор получаю ошибку импорта.

ОБНОВИТЬ:

OH.gush.. Я думаю, что проблема вызвана непониманием zip-файла и поведения при импорте python.. Я передаю parser.py в --py-files, он работает, жалуюсь на другую зависимость. и zip только файлы.py [не включая.pyc], кажется, тоже работает.

Но я не мог понять почему.

7 ответов

Попробуйте эту функцию SparkContext

sc.addPyFile(path)

В соответствии с pyspark документация здесь

Добавьте зависимости.py или.zip для всех задач, которые будут выполняться в этом SparkContext в будущем. Переданный путь может быть либо локальным файлом, файлом в HDFS (или другими файловыми системами, поддерживаемыми Hadoop), либо URI HTTP, HTTPS или FTP.

Попробуйте загрузить файл модуля python в общедоступное облачное хранилище (например, AWS S3) и передать URL-адрес этому методу.

Вот более полный материал для чтения: http://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_python.html

Попробуйте импортировать свой пользовательский модуль из самого метода, а не из верхней части скрипта драйвера, например:

def parse_record(record):
    import parser
    p = parser.parse(record)
    return p

скорее, чем

import parser
def parse_record(record):
    p = parser.parse(record)
    return p

Cloud Pickle, похоже, не распознает, когда пользовательский модуль был импортирован, поэтому он пытается выбрать модули верхнего уровня вместе с другими данными, необходимыми для запуска метода. По моему опыту, это означает, что модули верхнего уровня, кажется, существуют, но у них нет используемых членов, и вложенные модули не могут быть использованы, как ожидается. Однажды либо импортируй с from A import * или изнутри метода (import A.B), модули работали как положено.

Похоже, что один или несколько узлов не настроены должным образом. Все ли узлы в кластере имеют одинаковую версию / конфигурацию Python (т. Е. Все они имеют установленный модуль синтаксического анализатора)?

Если вы не хотите проверять одну за другой, вы можете написать скрипт, чтобы проверить, установлен ли он, или установить его для вас. В этой теме показано несколько способов сделать это.

PySpark в EMR по умолчанию настроен для Python 2.6, поэтому убедитесь, что он не устанавливается для интерпретатора Python 2.7

Вам необходимо упаковать свой код Python, используя такие инструменты, как setuptools. Это позволит вам создать файл.egg, который похож на файл java jar. Затем вы можете указать путь к этому файлу с помощью --py-files

spark-submit --py-files path_to_egg_file path_to_spark_driver_file

Создайте zip-файлы (example- abc.zip), содержащие все ваши зависимости.

При создании контекста spark укажите имя файла zip как:

    sc = SparkContext(conf=conf, pyFiles=["abc.zip"])

Я столкнулся с подобной проблемой, мои рабочие узлы не могли обнаружить модули, даже если я использовал --py-files переключатель.

Я сделал пару вещей: сначала я попытался поместить оператор import после того, как создал переменную SparkContext (sc), надеясь, что импорт должен произойти после того, как модуль будет отправлен на все узлы, но все же он не работает. Я тогда попробовал sc.addFile добавить модуль в сам скрипт (вместо отправки его в качестве аргумента командной строки) и затем импортировать функции модуля. Это добилось цели по крайней мере в моем случае.

Другие вопросы по тегам