Как предотвратить утечку памяти при тестировании с HiveContext в PySpark

Я использую pyspark для некоторой обработки данных и использую HiveContext для оконной функции.

Чтобы протестировать код, я использую TestHiveContext, в основном копируя реализацию из исходного кода pyspark:

https://spark.apache.org/docs/preview/api/python/_modules/pyspark/sql/context.html

@classmethod
def _createForTesting(cls, sparkContext):
    """(Internal use only) Create a new HiveContext for testing.

    All test code that touches HiveContext *must* go through this method. Otherwise,
    you may end up launching multiple derby instances and encounter with incredibly
    confusing error messages.
    """
    jsc = sparkContext._jsc.sc()
    jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
    return cls(sparkContext, jtestHive)

Затем мои тесты наследуют базовый класс, который может получить доступ к контексту.

Некоторое время это работало нормально. Тем не менее, я начал замечать, что у некоторых нерегулярных процессов заканчиваются проблемы с памятью, когда я добавляю больше тестов. Теперь я не могу запустить тестовый набор без сбоев.

"java.lang.OutOfMemoryError: Java heap space"

Я явно останавливаю контекст spark после каждого запуска теста, но это не убивает HiveContext. Таким образом, я считаю, что он создает новые HiveContexts каждый раз, когда запускается новый тест, и не удаляет старый, что приводит к утечке памяти.

Любые предложения о том, как разрушить базовый класс, чтобы он убил HiveContext?

1 ответ

Решение

Если вы счастливы использовать одноэлементное хранилище контекста Spark/Hive во всех своих тестах, вы можете сделать что-то вроде следующего.

test_contexts.py:

_test_spark = None
_test_hive = None

def get_test_spark():
    if _test_spark is None:
        # Create spark context for tests.
        # Not really sure what's involved here for Python.
        _test_spark = ...
    return _test_spark

def get_test_hive():
    if _test_hive is None:
        sc = get_test_spark()
        jsc = test_spark._jsc.sc()
        _test_hive = sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
    return _test_hive

И тогда вы просто импортируете эти функции в свои тесты.

my_test.py:

from test_contexts import get_test_spark, get_test_hive

def test_some_spark_thing():
    sc = get_test_spark()
    sqlContext = get_test_hive()
    # etc
Другие вопросы по тегам