Автоматическое тестирование PySpark с использованием тестов носа
У меня проблемы с настройкой автоматического тестирования Spark с помощью тестов носа. Я думаю, что это потому, что исполнитель Spark не знает о __init__.py
файлы, поэтому я также попытался добавить src/spark/main.py
с помощью addPyFiles
в SparkContext, не повезло.
Traceback (most recent call last):
File "../src/spark/tests/test_main.py", line 64, in testfirstFileMapper
.map(main.firstFileMapper).collect()
File "spark-1.2.0/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
File "spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
self.target_id, self.name)
File "spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling o33.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "spark-1.2.0/python/pyspark/worker.py", line 90, in main
command = pickleSer._read_with_length(infile)
File "spark-1.2.0/python/pyspark/serializers.py", line 151, in _read_with_length
return self.loads(obj)
File "spark-1.2.0/python/pyspark/serializers.py", line 396, in loads
return cPickle.loads(obj)
ImportError: No module named spark.main
Моя структура проекта выглядит примерно так:
src/
├── scripts
└── spark
├── __init__.py
├── main.py
└── tests
├── sample_data
│ ├── 1.txt
│ ├── 2.txt
│ ├── 3.txt
│ ├── 4.txt
│ └── 5.txt
├── __init__.py
└── test_main.py
src/tests/__init__.py
используется для установки py4j на путь сборки Python и успешно создает SparkContext.
pathToPy4j = glob.glob(os.path.join(os.environ['SPARK_HOME'], "python", "lib", "py4j-*-src.zip"))[0]
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
sys.path.append(pathToPy4j)
внутри src/spark/tests/test_main.py
Я пытаюсь импортировать все функции, определенные в src / spark / main.py, это что-то вроде строки:
###.... other imports ###
from spark import main
class ReusablePySparkContext(unittest.TestCase):
@classmethod
def setUpClass(cls):
conf = SparkConf().setAppName(cls.__name__) \
.setMaster("local") \
.set("log4j.rootCategory", "WARN, console")
cls.sc = SparkContext(conf=conf)
cls.sc.addPyFile(os.path.abspath(main.__file__))
@classmethod
def tearDownClass(cls):
cls.sc.stop()
class InputMapperTests(ReusablePySparkContext):
@classmethod
def setUpClass(cls):
ReusablePySparkContext.setUpClass()
baseTestDirectory = os.path.dirname(os.path.abspath(__file__))
cls.firstFile = os.path.join(baseTestDirectory, "sample_data/1.txt")
cls.secondFile = os.path.join(baseTestDirectory, "sample_data/2.txt")
cls.thirdFile = os.path.join(baseTestDirectory, "sample_data/3.txt")
cls.forthFile = os.path.join(baseTestDirectory, "sample_data/4.txt")
cls.fifthFile = os.path.join(baseTestDirectory, "sample_data/5.txt")
@classmethod
def tearDownClass(cls):
#not important right now
def testFirstFileMapper(self):
localPathToSample = "file:///" + self.firstFile
firstFileResults = self.sc.newAPIHadoopFile(
path=localPathToSample,
inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
keyClass="org.apache.hadoop.io.Text",
valueClass="org.apache.hadoop.io.Text") \
.filter(lambda (key, value): key != 0) \
.map(main.firstFileMapper).collect()
assert len(self.firstFileResults) == 3, "First sample should return 3 parsed elements"
assert self.firstFileResults[0][0][0] == "XXXXXXXXX", "Field X is not parsed correctly"
assert len(self.firstFileResults[0][1]) == 14, "firstFile values should be equal to 14"
Я пытался использовать относительный импорт, псевдонимы, но все равно не смог заставить его работать, я что-то упустил?
Также, если я вызываю main.firstFileMapper() перед установкой, функция вызывается правильно.