Как читать разделенные файлы паркета из S3, используя pyarrow в python
Я ищу способы чтения данных из нескольких многораздельных каталогов из S3, используя Python.
data_folder / serial_number = 1 / cur_date = 20-12-2012 / abcdsd0324324.snappy.parquet data_folder / serial_number = 2 / cur_date = 27-12-2012 / asdsdfsd0324324.snappy.parquet
Модуль ParquetDataset в pyarrow может считывать данные с разделов. Итак, я попробовал следующий код:
>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)
Это бросило следующую ошибку:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
.format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/
Основываясь на документации pyarrow, я попытался использовать s3fs в качестве файловой системы, а именно:
>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)
Который выдает следующую ошибку:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'
Я ограничен в использовании кластера ECS, поэтому spark/pyspark не вариант.
Есть ли способ, с помощью которого мы можем легко прочитать файлы паркета в Python из таких разделенных каталогов в s3? Я чувствую, что перечисление всех каталогов и последующее чтение не является хорошей практикой, как предлагается в этой ссылке. Мне нужно было бы преобразовать прочитанные данные в фрейм данных Pandas для дальнейшей обработки и, следовательно, предпочесть опции, связанные с fastparquet или pyarrow. Я открыт для других вариантов в Python, а также.
6 ответов
Мне удалось заставить это работать с последней версией fastparquet & s3fs. Ниже приведен код для того же:
import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()
#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)
myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()
кредиты Мартин за указание мне в правильном направлении через наш разговор
NB: Это будет медленнее, чем использование pyarrow, основываясь на тесте. Я обновлю свой ответ, как только поддержка s3fs будет реализована в pyarrow через ARROW-1213
Я сделал быстрый тест на индивидуальные итерации с помощью pyarrow и списка файлов, отправленных как fastparquet. fastparquet быстрее с s3fs против pyarrow + мой хакерский код. Но я считаю, что pyarrow +s3fs будет быстрее после внедрения.
Код и тесты ниже:
>>> def test_pq():
... for current_file in list_parquet_files:
... f = fs.open(current_file)
... df = pq.read_table(f).to_pandas()
... # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
... #probably not the best way to split :)
... elements_list=current_file.split('/')
... for item in elements_list:
... if item.find(date_partition) != -1:
... current_date = item.split('=')[1]
... elif item.find(dma_partition) != -1:
... current_dma = item.split('=')[1]
... df['serial_number'] = current_dma
... df['cur_date'] = current_date
... list_.append(df)
... frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468
>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
... df = fp_obj.to_pandas()
>>> timeit.timeit('test_fp',number =10,globals=globals())
2.0100269466638565e-06
Ссылка:
Для python 3.6+ AWS имеет библиотеку под названием aws-data-wrangler, которая помогает с интеграцией между Pandas/S3/Parquet.
установить делать;
pip install awswrangler
читать паркет из s3 с помощью awswrangler 1.x.x
и выше делайте;
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)
Установив dataset=True
awswrangler ожидает паркетных файлов с разделами. Он будет читать все отдельные файлы паркета с разных разделов ниже ключа s3, который вы укажете вpath
.
Для тех из вас, кто хочет читать только части разделенного паркетного файла, pyarrow принимает список ключей, а также только частичный путь к каталогу для чтения во всех частях раздела. Этот метод особенно полезен для организаций, которые разбили свои наборы данных паркета по значимому, например, по году или стране, позволяя пользователям указывать, какие части файла им нужны. Это снизит затраты в долгосрочной перспективе, поскольку AWS взимает плату за байт при чтении наборов данных.
# Read in user specified partitions of a partitioned parquet file
import s3fs
import pyarrow.parquet as pq
s3 = s3fs.S3FileSystem()
keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet']
bucket = 'bucket_yada_yada_yada'
# Add s3 prefix and bucket name to all keys in list
parq_list=[]
for key in keys:
parq_list.append('s3://'+bucket+'/'+key)
# Create your dataframe
df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas()
Эта проблема была решена в этом запросе в 2017 году.
Для тех, кто хочет читать паркет из S3, используя только пиарроу, вот пример:
import s3fs
import pyarrow.parquet as pq
from pyarrow.filesystem import S3FSWrapper
fs = s3fs.S3FileSystem()
bucket = "your-bucket"
path = "your-path"
# Python 3.6 or later
p_dataset = pq.ParquetDataset(
f"s3://{bucket}/{path}",
filesystem=fs
)
df = p_dataset.read().to_pandas()
# Pre-python 3.6
p_dataset = pq.ParquetDataset(
"s3://{0}/{1}".format(bucket, path),
filesystem=fs
)
df = p_dataset.read().to_pandas()
В PyArrow 7.0.0 есть некоторые улучшения нового модуля,pyarrow.dataset
, предназначенный для абстрагирования концепции набора данных от предыдущей, специфичной для Parquet
pyarrow.parquet.ParquetDataset
.
Предполагая, что вы согласны со схемой набора данных, полученной из первого файла, пример из документации для чтения секционированного набора данных должен просто работать.
Вот более полный пример, предполагающий, что вы хотите использовать данные из S3:
import pyarrow.dataset as ds
from pyarrow import fs
s3 = fs.S3FileSystem()
dataset = ds.dataset(
"my-bucket-name/my-path-to-dataset-partitions",
format="parquet",
filesystem=s3,
partitioning="hive"
)
# Assuming your data is partitioned like year=2022/month=4/day=29
# this will only have to read the files for that day
expression = ((ds.field("year") == 2022) & (ds.field("month") == 4) & (ds.field("day") == 29))
pyarrow_table_2022_04_29 = dataset.to_table(filter=expression)
Слово предупреждения, если вы определяете схему набора данных самостоятельно. Этот вывод выше с аргументом раздела автоматически добавляет разделы в вашу схему набора данных .
Если вы хотите, чтобы секционирование правильно работало с определенной вручную схемой набора данных, вы должны убедиться, что добавили секции в схему:
import pyarrow as pa
my_manual_schema = pa.schema([]) # Some pyarrow.Schema instance for your dataset
# Be sure to add the partitions even though they're not in the dataset files
my_manual_schema.append(pa.field("year", pa.int16()))
my_manual_schema.append(pa.field("month", pa.int8()))
my_manual_schema.append(pa.field("day", pa.int8()))
dataset = ds.dataset(
"my-bucket-name/my-path-to-dataset-partitions",
format="parquet",
filesystem=s3,
schema=my_manual_schema,
partitioning="hive"
)
Давайте обсудим в https://issues.apache.org/jira/browse/ARROW-1213 и https://issues.apache.org/jira/browse/ARROW-1119. Мы должны добавить некоторый код, чтобы pyarrow мог распознавать файловую систему s3fs, и добавить класс совместимости / совместимости для соответствия немного отличающемуся API файловой системы S3FS от pyarrow.