Оптимальный способ хранения данных от панд до снежинки
Фрейм данных огромен (7-8 миллионов строк). Пробовал to_sql с chunksize = 5000, но он так и не закончился.
С помощью,
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
df.to_sql(snowflake_table , engine, if_exists='replace', index=False, index_label=None, chunksize=20000)
Каковы другие оптимальные решения для хранения данных в SF от Pandas DF? Или что я тут не так делаю? Размер DF обычно составляет 7-10 миллионов строк.
6 ответов
Оптимальный способ, на который указал ilja-everila, - это "скопировать в...", поскольку SF требовал, чтобы csv был помещен в облако перед преобразованием. Я не решался это сделать, но, похоже, это единственный вариант, учитывая, что производительность находится в 5-10 минут для 6,5 миллионов записей.
Снежинка обеспечивает
write_pandas
а также
pd_writer
вспомогательные функции для управления этим:
from snowflake.connector.pandas_tools import pd_writer
df.to_sql(snowflake_table, engine, index=False, method=pd_writer)
# ^ here
В
pd_writer()
функция использует
write_pandas()
:
write_pandas(): записывает кадр данных Pandas в таблицу в базе данных Snowflake.
Для записи данных в таблицу функция сохраняет данные в файлы Parquet, использует команду PUT для загрузки этих файлов на временную сцену и использует команду COPY INTO для копирования данных из файлов в таблицу.
Наименее болезненный способ, который я могу представить, - это сбросить файл в S3
и Snowpipe загрузит его в Snowflake автоматически. С такой настройкой вам вообще не нужно выполнять какие-либо команды копирования или делать какие-либо вызовы Snowflake.
См. Документацию Snowflake для получения подробной информации о том, как настроить Snowpipe для S3. Короче говоря, вам нужно создать этап, целевую таблицу, формат файла (хотя я думаю, у вас уже есть эти вещи) и канал. Затем настройте уведомления SQS для вашего сегмента, которые будет прослушивать канал.
Snowflake предлагает файлы размером около 10–100 МБ, поэтому, вероятно, будет неплохо разделить файл.
# set up credentials (s3fs is built on BOTO hence this is AWS specific)
fs = s3fs.S3FileSystem(key=key, secret=secret)
# number of files to split into
n_chunks = 2
# loop over dataframe and dump chunk by chunk to S3
# (you likely want to expand file naming logic to avoid overwriting existing files)
for f_name, chunks in enumerate(np.array_split(np.arange(df.shape[0]), n_chunks)):
bytes_to_write = df.iloc[chunks].to_csv(index=False).encode()
with fs.open('s3://mybucket/test/dummy_{}.csv'.format(f_name), 'wb') as f:
f.write(bytes_to_write)
Для справки я попробовал это с 7-мегабайтным фреймом данных, разделенным на 5 файлов размером около 40 МБ. От начала разделения фрейма данных до того, как все строки были доставлены в Snowflake, потребовалось около 3 минут 40 секунд.
Если вы используете SQLAlchemy, вы можете попробовать установить параметрmethod='multi'
df.to_sql('table_name',
con=con,
index=False,
if_exists='append',
schema=schema,
method='multi',
chunksize=10000)
Для использования SQLAlchemy, вы могли бы также добавить, в параметре соединения, paramstyle=qmark
это связывает данные. На это также ссылаются здесь: https://github.com/snowflakedb/snowflake-connector-python/issues/37
После этого изменения, если вы считаете нужным, может быть хорошей идеей сделать сравнение производительности между подходом SQLAlchemy и подходом массовой загрузки для записи большого DF в файлы и использовать COPY INTO для загрузки файлов в таблицу Snowflake.
Pandas выполняет "вставить в..." с несколькими значениями за сценой. Snowflake имеет ограничение на прием до 16384 записей. Измените размер блока =16384.