Как сохранить огромный массив данных pandas в формате hdf?

Я работаю с пандами и со свечами. Кадры данных всегда очень большие (> 20 ГБ), и стандартных функций зажигания недостаточно для этих размеров. В настоящее время я конвертирую мой pandas dataframe в искровой dataframe, например так:

dataframe = spark.createDataFrame(pandas_dataframe)  

Я делаю это преобразование, потому что с помощью spark записывать кадры в hdfs очень просто:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")

Но преобразование не выполняется для информационных кадров, размер которых превышает 2 ГБ. Если я преобразую фрейм данных spark в панды, я могу использовать pyarrow:

// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)

Это быстрый разговор от искры до панд, и он также работает для информационных кадров больше 2 ГБ. Я пока не мог найти способ сделать это наоборот. Имеется в виду наличие информационного кадра панд, который я преобразую, чтобы зажечь с помощью пиарроу. Проблема в том, что я действительно не могу найти, как записать pandas dataframe в hdfs.

Моя версия панд: 0.19.0

3 ответа

Решение

Имеется в виду наличие информационного кадра панд, который я преобразую, чтобы зажечь с помощью пиарроу.

pyarrow.Table.fromPandasэто функция, которую вы ищете:

Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)

Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa

pdf = ...  # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf)  # type: pyarrow.lib.Table

Результат может быть записан непосредственно в Parquet / HDFS без передачи данных через Spark:

import pyarrow.parquet as pq

fs  = pa.hdfs.connect()

with fs.open(path, "wb") as fw
    pq.write_table(adf, fw)

Смотрите также

Искровые ноты:

Кроме того, поскольку Spark 2.3 (текущий мастер) Arrow поддерживается непосредственно вcreateDataFrame( SPARK-20791 - Используйте Apache Arrow для улучшения Spark createDataFrame из Pandas.DataFrame). Он используетSparkContext.defaultParallelism рассчитать количество кусков, чтобы вы могли легко контролировать размер отдельных партий.

в заключение defaultParallelism может использоваться для контроля количества разделов, генерируемых с использованием стандартных _convert_from_pandas, эффективно уменьшая размер ломтиков до чего-то более управляемого.

К сожалению, это вряд ли решит ваши текущие проблемы с памятью. Оба зависят от parallelize, поэтому храните все данные в памяти узла драйвера. Переключение на стрелку или изменение конфигурации может только ускорить ограничение размера процесса или адресного блока.

На практике я не вижу никакой причины переключаться на Spark, если вы используете местные панды DataFrame в качестве входа. Самым серьезным узким местом в этом сценарии является сетевой ввод / вывод драйвера, и распространение данных не решит эту проблему.

С https://issues.apache.org/jira/browse/SPARK-6235

Поддержка распараллеливания R data.frame размером более 2 ГБ

решено.

С https://pandas.pydata.org/pandas-docs/stable/r_interface.html

Преобразование DataFrames в объекты R

вы можете преобразовать pandas dataframe в R data.frame

Так что, возможно, преобразование панд -> R -> Spark -> hdfs?

Еще один способ - преобразовать ваш pandas dataframe в spark dataframe (используя pyspark) и сохранить его в hdfs с помощью команды save. пример

    df = pd.read_csv("data/as/foo.csv")
    df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
    sc = SparkContext(conf=conf)
    sqlCtx = SQLContext(sc)
    sdf = sqlCtx.createDataFrame(df)

Вот astype изменяет тип вашего столбца с object в string, Это спасает вас от исключений, возникающих в противном случае, так как искра не может определить тип панд object, Но убедитесь, что эти столбцы действительно имеют строковый тип.

Теперь, чтобы сохранить ваш файл в формате hdf:

    sdf.write.csv('mycsv.csv')

Хаком может быть создание N-панда данных (каждый размером менее 2 ГБ) (горизонтальное разделение) из большого и создание N различных искровых фреймов данных, а затем объединение (объединение) их для создания окончательного для записи в HDFS. Я предполагаю, что ваша главная машина мощная, но у вас также есть кластер, в котором вы работаете с Spark.

Другие вопросы по тегам