Оптимальный способ хранения данных от панд до снежинки

Фрейм данных огромен (7-8 миллионов строк). Пробовал to_sql с chunksize = 5000, но он так и не закончился.

С помощью,

from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

df.to_sql(snowflake_table , engine, if_exists='replace', index=False, index_label=None, chunksize=20000)

Каковы другие оптимальные решения для хранения данных в SF от Pandas DF? Или что я тут не так делаю? Размер DF обычно составляет 7-10 миллионов строк.

6 ответов

Решение

Оптимальный способ, на который указал ilja-everila, - это "скопировать в...", поскольку SF требовал, чтобы csv был помещен в облако перед преобразованием. Я не решался это сделать, но, похоже, это единственный вариант, учитывая, что производительность находится в 5-10 минут для 6,5 миллионов записей.

Снежинка обеспечивает write_pandasа также pd_writerвспомогательные функции для управления этим:

      from snowflake.connector.pandas_tools import pd_writer

df.to_sql(snowflake_table, engine, index=False, method=pd_writer)
#                                                      ^ here

В pd_writer()функция использует write_pandas():

write_pandas(): записывает кадр данных Pandas в таблицу в базе данных Snowflake.

Для записи данных в таблицу функция сохраняет данные в файлы Parquet, использует команду PUT для загрузки этих файлов на временную сцену и использует команду COPY INTO для копирования данных из файлов в таблицу.

Наименее болезненный способ, который я могу представить, - это сбросить файл в S3и Snowpipe загрузит его в Snowflake автоматически. С такой настройкой вам вообще не нужно выполнять какие-либо команды копирования или делать какие-либо вызовы Snowflake.

См. Документацию Snowflake для получения подробной информации о том, как настроить Snowpipe для S3. Короче говоря, вам нужно создать этап, целевую таблицу, формат файла (хотя я думаю, у вас уже есть эти вещи) и канал. Затем настройте уведомления SQS для вашего сегмента, которые будет прослушивать канал.

Snowflake предлагает файлы размером около 10–100 МБ, поэтому, вероятно, будет неплохо разделить файл.

# set up credentials (s3fs is built on BOTO hence this is AWS specific)
fs = s3fs.S3FileSystem(key=key, secret=secret)

# number of files to split into
n_chunks = 2

# loop over dataframe and dump chunk by chunk to S3
# (you likely want to expand file naming logic to avoid overwriting existing files)
for f_name, chunks in enumerate(np.array_split(np.arange(df.shape[0]), n_chunks)):
    bytes_to_write = df.iloc[chunks].to_csv(index=False).encode()
    with fs.open('s3://mybucket/test/dummy_{}.csv'.format(f_name), 'wb') as f:
        f.write(bytes_to_write)

Для справки я попробовал это с 7-мегабайтным фреймом данных, разделенным на 5 файлов размером около 40 МБ. От начала разделения фрейма данных до того, как все строки были доставлены в Snowflake, потребовалось около 3 минут 40 секунд.

Если вы используете SQLAlchemy, вы можете попробовать установить параметрmethod='multi'

      df.to_sql('table_name',
          con=con,
          index=False,
          if_exists='append',
          schema=schema,
          method='multi',
          chunksize=10000)

Для использования SQLAlchemy, вы могли бы также добавить, в параметре соединения, paramstyle=qmark это связывает данные. На это также ссылаются здесь: https://github.com/snowflakedb/snowflake-connector-python/issues/37

После этого изменения, если вы считаете нужным, может быть хорошей идеей сделать сравнение производительности между подходом SQLAlchemy и подходом массовой загрузки для записи большого DF в файлы и использовать COPY INTO для загрузки файлов в таблицу Snowflake.

Pandas выполняет "вставить в..." с несколькими значениями за сценой. Snowflake имеет ограничение на прием до 16384 записей. Измените размер блока =16384.