Создание набора данных для паркета Petastorm через Spark завершается с ошибкой переполнения (больше 4 ГБ)
Я пытаюсь реализовать создание набора данных Uber Petastorm, которое использует Spark для создания файла паркета, следуя инструкциям на их странице Github.
Код:
spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext
with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
schema=MySchema, row_group_size_mb=256):
logging.info('Building RDD...')
rows_rdd = sc.parallelize(ids)\
.map(row_generator)\ # Generator that yields lists of examples
.flatMap(lambda x: dict_to_spark_row(MySchema, x))
logging.info('Creating DataFrame...')
spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet('file:///opt/data/hello_world_dataset')
Теперь код СДР выполняется успешно, но происходит сбой только .createDataFrame
позвоните со следующей ошибкой:
_pickle.PicklingError: Не удалось сериализовать широковещательную рассылку: OverflowError: невозможно сериализовать строку, размер которой превышает 4 ГБ
Это мой первый опыт работы со Spark, поэтому я не могу точно сказать, происходит ли эта ошибка в Spark или Petastorm.
Просматривая другие решения этой ошибки (в отношении Spark, а не Petastorm), я обнаружил, что это может быть связано с протоколом травления, но я не могу подтвердить это, и при этом я не нашел способа изменить протокол травления.
Как я мог избежать этой ошибки?
1 ответ
Проблема заключается в том, что выполняется выборка для передачи данных между различными процессами, протоколом выбора по умолчанию является 2, и нам нужно использовать 4 для передачи объектов размером более 4 ГБ.
Чтобы изменить протокол протравливания, перед созданием сеанса Spark используйте следующий код
from pyspark import broadcast
import pickle
def broadcast_dump(self, value, f):
pickle.dump(value, f, 4) # was 2, 4 is first protocol supporting >4GB
f.close()
return f.name
broadcast.Broadcast.dump = broadcast_dump
Чтобы построить Bluesummers ответ
Основная ветвь spark прямо сейчас исправляет эту проблему, поэтому я использовал этот код для исправления функции дампа таким же образом, но это немного более безопасно. [тест с 2.3.2]
from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle
def broadcast_dump(self, value, f):
try:
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
except pickle.PickleError:
raise
except Exception as e:
msg = "Could not serialize broadcast: %s: %s" \
% (e.__class__.__name__, _exception_message(e))
print_exec(sys.stderr)
raise pickle.PicklingError(msg)
f.close()
broadcast.Broadcast.dump = broadcast_dump