Как сохранить огромный массив данных pandas в формате hdf?
Я работаю с пандами и со свечами. Кадры данных всегда очень большие (> 20 ГБ), и стандартных функций зажигания недостаточно для этих размеров. В настоящее время я конвертирую мой pandas dataframe в искровой dataframe, например так:
dataframe = spark.createDataFrame(pandas_dataframe)
Я делаю это преобразование, потому что с помощью spark записывать кадры в hdfs очень просто:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
Но преобразование не выполняется для информационных кадров, размер которых превышает 2 ГБ. Если я преобразую фрейм данных spark в панды, я могу использовать pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
Это быстрый разговор от искры до панд, и он также работает для информационных кадров больше 2 ГБ. Я пока не мог найти способ сделать это наоборот. Имеется в виду наличие информационного кадра панд, который я преобразую, чтобы зажечь с помощью пиарроу. Проблема в том, что я действительно не могу найти, как записать pandas dataframe в hdfs.
Моя версия панд: 0.19.0
3 ответа
Имеется в виду наличие информационного кадра панд, который я преобразую, чтобы зажечь с помощью пиарроу.
pyarrow.Table.fromPandas
это функция, которую вы ищете:
Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True) Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa
pdf = ... # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table
Результат может быть записан непосредственно в Parquet / HDFS без передачи данных через Spark:
import pyarrow.parquet as pq
fs = pa.hdfs.connect()
with fs.open(path, "wb") as fw
pq.write_table(adf, fw)
Смотрите также
- Wes McKinney ответ для чтения паркетных файлов из HDFS с использованием PyArrow.
- Чтение и запись формата Apache Parquet в
pyarrow
документация - Подключение к собственной файловой системе Hadoop (HDFS) в Python
Искровые ноты:
Кроме того, поскольку Spark 2.3 (текущий мастер) Arrow поддерживается непосредственно вcreateDataFrame
( SPARK-20791 - Используйте Apache Arrow для улучшения Spark createDataFrame из Pandas.DataFrame). Он используетSparkContext.defaultParallelism
рассчитать количество кусков, чтобы вы могли легко контролировать размер отдельных партий.
в заключение defaultParallelism
может использоваться для контроля количества разделов, генерируемых с использованием стандартных _convert_from_pandas
, эффективно уменьшая размер ломтиков до чего-то более управляемого.
К сожалению, это вряд ли решит ваши текущие проблемы с памятью. Оба зависят от parallelize
, поэтому храните все данные в памяти узла драйвера. Переключение на стрелку или изменение конфигурации может только ускорить ограничение размера процесса или адресного блока.
На практике я не вижу никакой причины переключаться на Spark, если вы используете местные панды DataFrame
в качестве входа. Самым серьезным узким местом в этом сценарии является сетевой ввод / вывод драйвера, и распространение данных не решит эту проблему.
С https://issues.apache.org/jira/browse/SPARK-6235
Поддержка распараллеливания R data.frame размером более 2 ГБ
решено.
С https://pandas.pydata.org/pandas-docs/stable/r_interface.html
Преобразование DataFrames в объекты R
вы можете преобразовать pandas dataframe в R data.frame
Так что, возможно, преобразование панд -> R -> Spark -> hdfs?
Еще один способ - преобразовать ваш pandas dataframe в spark dataframe (используя pyspark) и сохранить его в hdfs с помощью команды save. пример
df = pd.read_csv("data/as/foo.csv")
df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(df)
Вот astype
изменяет тип вашего столбца с object
в string
, Это спасает вас от исключений, возникающих в противном случае, так как искра не может определить тип панд object
, Но убедитесь, что эти столбцы действительно имеют строковый тип.
Теперь, чтобы сохранить ваш файл в формате hdf:
sdf.write.csv('mycsv.csv')
Хаком может быть создание N-панда данных (каждый размером менее 2 ГБ) (горизонтальное разделение) из большого и создание N различных искровых фреймов данных, а затем объединение (объединение) их для создания окончательного для записи в HDFS. Я предполагаю, что ваша главная машина мощная, но у вас также есть кластер, в котором вы работаете с Spark.