Как записать DataFrame в таблицу postgres?

Существует метод DataFrame.to_sql, но он работает только для баз данных mysql, sqlite и oracle. Я не могу перейти к этому методу postgres соединение или sqlalchemy двигатель.

8 ответов

Решение

Начиная с версии 0.14 (выпущен в конце мая 2014 года), поддерживается postgresql. sql модуль теперь использует sqlalchemy поддерживать разные вкусы баз данных. Вы можете передать движок sqlalchemy для базы данных postgresql (см. Документацию). Например:

from sqlalchemy import create_engine
engine = create_engine('postgresql://scott:tiger@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

Вы правы, что в pandas до версии 0.13.1 postgresql не поддерживался. Если вам нужно использовать более старую версию панд, вот исправленная версия pandas.io.sql: https://gist.github.com/jorisvandenbossche/10841234.
Я написал это некоторое время назад, поэтому не могу полностью гарантировать, что это всегда работает, но база должна быть там). Если вы поместите этот файл в свой рабочий каталог и импортируете его, то вы сможете это сделать (где con это соединение postgresql):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

Более быстрый вариант:

Следующий код скопирует ваш DF Pandas в postgres DB намного быстрее, чем метод df.to_sql, и вам не понадобится промежуточный файл csv для хранения df.

Создайте движок на основе ваших спецификаций БД.

Создайте таблицу в своей базе данных postgres, которая будет иметь такое же количество столбцов, что и кадр данных (df).

Данные в DF будут вставлены в вашу таблицу postgres.

from sqlalchemy import create_engine
import psycopg2 
import io

Если вы хотите заменить таблицу, мы можем заменить ее обычным методом to_sql, используя заголовки из нашего df, а затем загрузить всю большую часть времени, требующую df, в БД.

df.head(0).to_sql('table_name', engine,if_exists='replace',index=False) #truncates the table

engine = create_engine('postgresql+psycopg2://username:password@host:port/database')
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()

Решение Pandas 0.24.0+

В Pandas 0.24.0 была представлена ​​новая функция, специально разработанная для быстрой записи в Postgres. Вы можете узнать больше об этом здесь: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

Вот как я это делаю, я могу быть быстрее, потому что он использует execute_batch:

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    cur = db_conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

Более быстрый способ записи df в таблицу в пользовательской схеме с/без индекса:

      """
Faster way to write df to table.
Slower way is to use df.to_sql()
"""

from io import StringIO

from pandas import DataFrame
from sqlalchemy.engine.base import Engine


class WriteDfToTableWithIndexMixin:
    @classmethod
    def write_df_to_table_with_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=True, index_label='id')

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='\t', header=False,
                  index=True, index_label='id')
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()


class WriteDfToTableWithoutIndexMixin:
    @classmethod
    def write_df_to_table_without_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=False)

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='\t', header=False, index=False)
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()

Если у вас есть значения JSON в столбце в вашем df, тогда описанный выше метод по-прежнему будет правильно загружать все данные, но столбец json будет иметь какой-то странный формат. Таким образом, преобразование этого столбца json в::jsonможет выдать ошибку. Вы должны использоватьto_sql(). Добавлятьmethod=multiчтобы ускорить процесс и добавитьchunksizeчтобы ваша машина не замерзла:

      df.to_sql(table_name, engine, if_exists='replace', schema=schema_name, index=False, method='multi', chunksize=1000)

используя psycopg2, вы можете использовать собственные команды sql для записи данных в таблицу postgres.

      import psycopg2
import pandas as pd

conn = psycopg2.connect("dbname='{db}' user='{user}' host='{host}' port='{port}' password='{passwd}'".format(
            user=pg_user,
            passwd=pg_pass,
            host=pg_host,
            port=pg_port,
            db=pg_db))
cur = conn.cursor()    
def insertIntoTable(df, table):
        """
        Using cursor.executemany() to insert the dataframe
        """
        # Create a list of tupples from the dataframe values
        tuples = list(set([tuple(x) for x in df.to_numpy()]))
    
        # Comma-separated dataframe columns
        cols = ','.join(list(df.columns))
        # SQL query to execute
        query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % (
            table, cols)
    
        try:
            cur.executemany(query, tuples)
            conn.commit()

        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return 1

Подключиться к базе данных (dialect = 'postgres' или 'mysql' и т. Д.)

engine = create_engine (f '{dialect}: // {user_name}@{host}:{port / {db_name}', echo = False)

сеанс = создатель сеанса (привязка = двигатель) ()

Читать файл csv в df

df = pd.read_csv (путь + f '/ {файл}')

Вставить df в таблицу

df.to_sql ('table_name', con = engine, if_exists = 'append', index = False)

Для Python 2.7 и Pandas 0.24.2 и с использованием Psycopg2

Модуль подключения Psycopg2

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

Подключиться к базе данных

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

Предполагая, что фрейм данных уже присутствует как df

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()
Другие вопросы по тегам