Автоматическое тестирование PySpark с использованием тестов носа

У меня проблемы с настройкой автоматического тестирования Spark с помощью тестов носа. Я думаю, что это потому, что исполнитель Spark не знает о __init__.py файлы, поэтому я также попытался добавить src/spark/main.py с помощью addPyFiles в SparkContext, не повезло.

Traceback (most recent call last):
  File "../src/spark/tests/test_main.py", line 64, in testfirstFileMapper
    .map(main.firstFileMapper).collect()
  File "spark-1.2.0/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
    self.target_id, self.name)
  File "spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
    format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling o33.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "spark-1.2.0/python/pyspark/worker.py", line 90, in main
    command = pickleSer._read_with_length(infile)
  File "spark-1.2.0/python/pyspark/serializers.py", line 151, in _read_with_length
    return self.loads(obj)
  File "spark-1.2.0/python/pyspark/serializers.py", line 396, in loads
    return cPickle.loads(obj)
ImportError: No module named spark.main

Моя структура проекта выглядит примерно так:

src/
├── scripts
└── spark
    ├── __init__.py
    ├── main.py
    └── tests
        ├── sample_data
        │   ├── 1.txt
        │   ├── 2.txt
        │   ├── 3.txt
        │   ├── 4.txt
        │   └── 5.txt
        ├── __init__.py
        └── test_main.py

src/tests/__init__.py используется для установки py4j на путь сборки Python и успешно создает SparkContext.

pathToPy4j = glob.glob(os.path.join(os.environ['SPARK_HOME'], "python", "lib", "py4j-*-src.zip"))[0]
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
sys.path.append(pathToPy4j)

внутри src/spark/tests/test_main.py Я пытаюсь импортировать все функции, определенные в src / spark / main.py, это что-то вроде строки:

###.... other imports ###
from spark import main 
class ReusablePySparkContext(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = SparkConf().setAppName(cls.__name__) \
                          .setMaster("local") \
                          .set("log4j.rootCategory", "WARN, console")
        cls.sc = SparkContext(conf=conf)
        cls.sc.addPyFile(os.path.abspath(main.__file__))

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()


class InputMapperTests(ReusablePySparkContext):

    @classmethod
    def setUpClass(cls):
        ReusablePySparkContext.setUpClass()
        baseTestDirectory = os.path.dirname(os.path.abspath(__file__))

        cls.firstFile  = os.path.join(baseTestDirectory, "sample_data/1.txt")
        cls.secondFile = os.path.join(baseTestDirectory, "sample_data/2.txt")
        cls.thirdFile  = os.path.join(baseTestDirectory, "sample_data/3.txt")
        cls.forthFile  = os.path.join(baseTestDirectory, "sample_data/4.txt")
        cls.fifthFile  = os.path.join(baseTestDirectory, "sample_data/5.txt")

    @classmethod
    def tearDownClass(cls):
        #not important right now

    def testFirstFileMapper(self):
        localPathToSample = "file:///" + self.firstFile
        firstFileResults = self.sc.newAPIHadoopFile(
                                  path=localPathToSample,
                                  inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
                                  keyClass="org.apache.hadoop.io.Text",
                                  valueClass="org.apache.hadoop.io.Text") \
                           .filter(lambda (key, value): key != 0) \
                           .map(main.firstFileMapper).collect()

        assert len(self.firstFileResults) == 3, "First sample should return 3 parsed elements"
        assert self.firstFileResults[0][0][0] == "XXXXXXXXX", "Field X is not parsed correctly"
        assert len(self.firstFileResults[0][1]) == 14, "firstFile values should be equal to 14"

Я пытался использовать относительный импорт, псевдонимы, но все равно не смог заставить его работать, я что-то упустил?

Также, если я вызываю main.firstFileMapper() перед установкой, функция вызывается правильно.

0 ответов

Другие вопросы по тегам