Потоковая передача / чанки csv из S3 в Python
Я намерен выполнить некоторые операции с большим объемом памяти над очень большим CSV-файлом, хранящимся в S3, с использованием Python, чтобы переместить скрипт в AWS Lambda. Я знаю, что могу читать всю память csv nto, но я определенно столкнусь с ограничениями памяти и памяти Lambda с таким большим файлом, есть ли какой-либо способ для потоковой передачи или просто чтения кусков csv за раз в Python, используя boto3/botocore, в идеале путем указания номеров строк для чтения?
Вот некоторые вещи, которые я уже пробовал:
1) используя range
параметр в S3.get_object
указать диапазон байтов для чтения. К сожалению, это означает, что последние строки обрезаются посередине, поскольку нет способа указать количество строк для чтения. Есть некоторые грязные обходные пути, такие как сканирование последнего символа новой строки, запись индекс, а затем использовать его в качестве отправной точки для следующего диапазона байтов, но я хотел бы избежать этого неуклюжего решения, если это возможно.
2) Использование S3 select для написания SQL-запросов для выборочного извлечения данных из сегментов S3. К сожалению row_numbers
Функция SQL не поддерживается, и не похоже, что есть способ чтения в подмножестве строк.
3 ответа
Предполагая, что ваш файл не сжат, это должно включать чтение из потока и разбиение на символ новой строки. Чтение фрагмента данных, поиск последнего экземпляра символа новой строки в этом фрагменте, разбиение и обработка.
s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key).read()
# number of bytes to read per chunk
chunk_size = 1000000
# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()
partial_chunk = b''
while (True):
chunk = partial_chunk + body.read(chunk_size)
last_newline = chunk.rfind(newline)
# write to a smaller file, or work against some piece of data
result = chunk[0:last_newline+1].decode('utf-8')
# keep the partial line you've read here
partial_chunk = chunk[last_newline+1:]
Если у вас есть сжатые файлы, то вам нужно использовать BytesIO
и GzipFile
класс внутри цикла; это более сложная проблема, потому что вам нужно сохранить детали сжатия Gzip.
Я разработал код, похожий на код @Kirk Broadhurst, но таймаут соединения происходил, если время обработки каждого фрагмента превышало 5 минут (примерно). Следующий код работает, открывая новое соединение для каждого фрагмента.
import boto3
import pandas as pd
import numpy as np
# The following credentials should not be hard coded, it's best to get these from cli.
region_name = 'region'
aws_access_key_id = 'aws_access_key_id'
aws_secret_access_key = 'aws_secret_access_key'
s3 =boto3.client('s3',region_name=region_name,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
obj = s3.get_object(Bucket='bucket', Key='key')
total_bytes = obj['ContentLength']
chunk_bytes = 1024*1024*5 # 5 MB as an example.
floor = int(total_bytes//chunk_bytes)
whole = total_bytes/chunk_bytes
total_chunks = [1+floor if floor<whole else floor][0]
chunk_size_list = [(i*chunk_bytes, (i+1)*chunk_bytes-1) for i in range(total_chunks)]
a,b = chunk_size_list[-1]
b = total_bytes
chunk_size_list[-1] = (a,b)
chunk_size_list = [f'bytes={a}-{b}' for a,b in chunk_size_list]
prev_str = ''
for i,chunk in enumerate(chunk_size_list):
s3 = boto3.client('s3', region_name=region_name, aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
byte_obj = s3.get_object(Bucket='bucket', Key='key', Range=chunk_size_list[i])
byte_obj = byte_obj['Body'].read()
str_obj = byte_obj.decode('utf-8')
del byte_obj
list_obj = str_obj.split('\n')
# You can use another delimiter instead of ',' below.
if len(prev_str.split(',')) < len(list_obj[1].split(',')) or len(list_obj[0].split(',')) < len(list_obj[1].split(',')):
list_obj[0] = prev_str+list_obj[0]
else:
list_obj = [prev_str]+list_obj
prev_str = list_obj[-1]
del str_obj, list_obj[-1]
list_of_elements = [st.split(',') for st in list_obj]
del list_obj
df = pd.DataFrame(list_of_elements)
del list_of_elements
gc.collect()
# You can process your pandas dataframe here, but you need to cast it to correct datatypes.
# casting na values to numpy nan type.
na_values = ['', '#N/A', '#N/A N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN', 'N/A', 'NA', 'NULL', 'NaN', 'n/a', 'nan', 'null']
df = df.replace(na_values, np.nan)
dtypes = {col1: 'float32', col2:'category'}
df = df.astype(dtype=dtypes, copy=False)
Вы можете использовать методiter_lines
полученного потокового тела для чтения тела построчно без загрузки его в память https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html
Вот пример кода:
import boto3
s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']
for line in body.iter_lines():
line = line.decode('utf-8')
... # process the line