Как прочитать список файлов паркета из S3 в виде фрейма данных pandas, используя pyarrow?

У меня есть хакерский способ добиться этого с помощью boto3 (1.4.4), pyarrow (0.4.1) и pandas (0.20.3).

Во-первых, я могу прочитать один файл паркет локально, как это:

import pyarrow.parquet as pq

path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()

Я также могу прочитать каталог файлов паркет локально, как это:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()

Оба работают как шарм. Теперь я хочу добиться того же удаленно с файлами, хранящимися в корзине S3. Я надеялся, что что-то вроде этого будет работать:

dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')

Но это не так:

OSError: Passed non-file path: s3n://dsn/to/my/bucket

После тщательного прочтения документации Pyarrow, в данный момент это кажется невозможным. Поэтому я предложил следующее решение:

Чтение отдельного файла с S3 и получение кадра данных pandas:

import io
import boto3
import pyarrow.parquet as pq

buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()

И вот мое хакерское, не очень оптимизированное решение для создания pandas dataframe из пути к папке S3:

import io
import boto3
import pandas as pd
import pyarrow.parquet as pq

bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
    buffer = io.BytesIO()
    s3.Object(bucket, key).download_fileobj(buffer)
    return buffer

client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)

Есть ли лучший способ добиться этого? Может быть, какой-то разъем для панд, использующих пиарроу? Я хотел бы избежать использования pyspark, но если нет другого решения, то я бы его взял.

9 ответов

Решение

Вы должны использовать s3fs модуль, предложенный yjk21. Однако в результате вызова ParquetDataset вы получите объект pyarrow.parquet.ParquetDataset. Чтобы получить Pandas DataFrame, вы, скорее всего, захотите применить .read_pandas().to_pandas() к нему:

import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()

pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()

Спасибо! Ваш вопрос на самом деле сказать мне много. Вот как я делаю это сейчас с pandas (0.21.1), который вызовет pyarrow, а также boto3 (1.3.1).

import boto3
import io
import pandas as pd

# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
    if s3_client is None:
        s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)

# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None, 
                                 s3_client=None, verbose=False, **args):
    if not filepath.endswith('/'):
        filepath = filepath + '/'  # Add '/' to the end
    if s3_client is None:
        s3_client = boto3.client('s3')
    if s3 is None:
        s3 = boto3.resource('s3')
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
               if item.key.endswith('.parquet')]
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    elif verbose:
        print('Load parquets:')
        for p in s3_keys: 
            print(p)
    dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) 
           for key in s3_keys]
    return pd.concat(dfs, ignore_index=True)

Затем вы можете прочитать несколько паркетов в папке из S3 по

df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')

(Полагаю, можно много упростить этот код.)

При условии, что у вас есть правильная установка пакета

$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2

и ваши общие файлы конфигурации и учетных данных AWS настроены соответствующим образом

вы можете использовать pandas сразу:

import pandas as pd

df = pd.read_parquet("s3://bucket/key.parquet")

В случае наличия нескольких профилей AWS вам также может потребоваться установить

$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible

так что вы можете получить доступ к своему ведру.

Это может быть сделано с помощью boto3, а также без использования Pyarrow

import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())

Вероятно, самый простой способ считывания данных паркета из облака в кадры данных - это использовать dask.dataframe следующим образом:

import dask.dataframe as dd
df = dd.read_parquet('s3://bucket/path/to/data-*.parq')

dask.dataframe можно читать из Google Cloud Storage, Amazon S3, файловой системы Hadoop и многого другого!

Если вы готовы также использовать AWS Data Wrangler.

import awswrangler as wr

df = wr.s3.read_parquet(path="s3://...")

Вы можете использовать s3fs из dask, который реализует интерфейс файловой системы для s3. Затем вы можете использовать аргумент файловой системы ParquetDataset следующим образом:

import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)

Использование предварительно подписанных URL-адресов

      s3 =s3fs.S3FileSystem(key='your_key',secret='your_secret',client_kwargs={"endpoint_url":'your_end_point'})

df = dd.read_parquet(s3.url('your_bucket' + 'your_filepath',expires=3600,client_method='get_object'))

Я попробовал решение @oya163, и оно работает, но после небольшого изменения

      import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3',aws_access_key_id='123',aws_secret_access_key= '456')
object = s3.Object('bucket_name','myoutput.parquet')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())   
Другие вопросы по тегам