Как предотвратить утечку памяти при тестировании с HiveContext в PySpark
Я использую pyspark для некоторой обработки данных и использую HiveContext для оконной функции.
Чтобы протестировать код, я использую TestHiveContext, в основном копируя реализацию из исходного кода pyspark:
https://spark.apache.org/docs/preview/api/python/_modules/pyspark/sql/context.html
@classmethod
def _createForTesting(cls, sparkContext):
"""(Internal use only) Create a new HiveContext for testing.
All test code that touches HiveContext *must* go through this method. Otherwise,
you may end up launching multiple derby instances and encounter with incredibly
confusing error messages.
"""
jsc = sparkContext._jsc.sc()
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
return cls(sparkContext, jtestHive)
Затем мои тесты наследуют базовый класс, который может получить доступ к контексту.
Некоторое время это работало нормально. Тем не менее, я начал замечать, что у некоторых нерегулярных процессов заканчиваются проблемы с памятью, когда я добавляю больше тестов. Теперь я не могу запустить тестовый набор без сбоев.
"java.lang.OutOfMemoryError: Java heap space"
Я явно останавливаю контекст spark после каждого запуска теста, но это не убивает HiveContext. Таким образом, я считаю, что он создает новые HiveContexts каждый раз, когда запускается новый тест, и не удаляет старый, что приводит к утечке памяти.
Любые предложения о том, как разрушить базовый класс, чтобы он убил HiveContext?
1 ответ
Если вы счастливы использовать одноэлементное хранилище контекста Spark/Hive во всех своих тестах, вы можете сделать что-то вроде следующего.
test_contexts.py:
_test_spark = None
_test_hive = None
def get_test_spark():
if _test_spark is None:
# Create spark context for tests.
# Not really sure what's involved here for Python.
_test_spark = ...
return _test_spark
def get_test_hive():
if _test_hive is None:
sc = get_test_spark()
jsc = test_spark._jsc.sc()
_test_hive = sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
return _test_hive
И тогда вы просто импортируете эти функции в свои тесты.
my_test.py:
from test_contexts import get_test_spark, get_test_hive
def test_some_spark_thing():
sc = get_test_spark()
sqlContext = get_test_hive()
# etc