SQLAlchemy - выполнение массового переноса (если существует, обновление, вставка) в postgresql

Я пытаюсь написать массовый переход в Python с помощью модуля SQLAlchemy (не в SQL!).

Я получаю следующую ошибку при добавлении SQLAlchemy:

sqlalchemy.exc.IntegrityError: (IntegrityError) duplicate key value violates unique constraint "posts_pkey"
DETAIL:  Key (id)=(TEST1234) already exists.

У меня есть стол posts с первичным ключом на id колонка.

В этом примере у меня уже есть строка в БД с id=TEST1234, Когда я пытаюсь db.session.add() новый объект сообщения с id установлен в TEST1234 Я получаю ошибку выше. У меня сложилось впечатление, что если первичный ключ уже существует, запись будет обновлена.

Как я могу использовать Flask-SQLAlchemy только на основе первичного ключа? Есть ли простое решение?

Если нет, я всегда могу проверить и удалить любую запись с соответствующим идентификатором, а затем вставить новую запись, но это кажется дорогим для моей ситуации, когда я не ожидаю много обновлений.

6 ответов

Решение

В SQLAlchemy есть операция upsert-esque:

db.session.merge()

После того, как я нашел эту команду, я смог выполнить upserts, но стоит упомянуть, что эта операция медленная для массового "upsert".

Альтернатива состоит в том, чтобы получить список первичных ключей, которые вы хотели бы сохранить, и запросить базу данных для любых соответствующих идентификаторов:

# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object

my_new_posts = {1: post1, 5: post5, 1000: post1000} 

for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
    # Only merge those posts which already exist in the database
    db.session.merge(my_new_posts.pop(each.id))

# Only add those posts which did not exist in the database 
db.session.add_all(my_new_posts.values())

# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()

Как вы отметили это с помощью postgresql Я предполагаю, что это то, что вы используете, поэтому вы можете использовать on_conflict_do_updateвариант. Вот простой пример:

class Post(Base):
    """
    A simple class for demonstration
    """

    id = Column(Integer, primary_key=True)
    title = Column(Unicode)

# Prepare all the values that should be "upserted" to the DB
values = [
    {"id": 1, "title": "mytitle 1"},
    {"id": 2, "title": "mytitle 2"},
    {"id": 3, "title": "mytitle 3"},
    {"id": 4, "title": "mytitle 4"},
]

stmt = insert(Post).values(values)
stmt = stmt.on_conflict_do_update(
    # Let's use the constraint name which was visible in the original posts error msg
    constraint="post_pkey",

    # The columns that should be updated on conflict
    set_={
        "title": stmt.excluded.title
    }
)
session.execute(stmt)

См. Дополнительную информацию в документации PG (например, откуда берется термин "исключенный").

Боковое примечание о повторяющихся именах столбцов

В приведенном выше коде имена столбцов используются как ключи dict как в values список и аргумент для set_. Если имя столбца изменено в определении класса, его нужно изменить везде, иначе он сломается. Этого можно избежать, обратившись к определениям столбцов, сделав код немного уродливее, но более надежным:

coldefs = Post.__table__.c

values = [
    {coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"},
    ...
]

...

    set_={
        coldefs.title.name: stmt.excluded.title
...
}

Альтернативный подход с использованием расширения компиляции ( https://docs.sqlalchemy.org/en/13/core/compiler.html):

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
    """
    converts every SQL insert to an upsert  i.e;
    INSERT INTO test (foo, bar) VALUES (1, 'a')
    becomes:
    INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
    (assuming foo is a primary key)
    :param insert_stmt: Original insert statement
    :param compiler: SQL Compiler
    :param kwargs: optional arguments
    :return: upsert statement
    """
    pk = insert_stmt.table.primary_key
    insert = compiler.visit_insert(insert_stmt, **kwargs)
    ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
    updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
    upsert = ' '.join((insert, ondup, updates))
    return upsert

Это должно гарантировать, что все операторы вставки будут вести себя как восходящие. Эта реализация находится на диалекте Postgres, но ее довольно легко изменить для диалекта MySQL.

Я начал смотреть на это и думаю, что нашел довольно эффективный способ делать upserts в sqlalchemy с сочетанием bulk_insert_mappings а также bulk_update_mappings вместо merge.

      import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from contextlib import contextmanager


engine = None
Session = sessionmaker()
Base = declarative_base()


def creat_new_database(db_name="sqlite:///bulk_upsert_sqlalchemy.db"):
    global engine
    engine = create_engine(db_name, echo=False)
    local_session = scoped_session(Session)
    local_session.remove()
    local_session.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


@contextmanager
def db_session():
    local_session = scoped_session(Session)
    session = local_session()

    session.expire_on_commit = False

    try:
        yield session
    except BaseException:
        session.rollback()
        raise
    finally:
        session.close()


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def bulk_upsert_mappings(customers):

    entries_to_update = []
    entries_to_put = []
    with db_session() as sess:
        t0 = time.time()

        # Find all customers that needs to be updated and build mappings
        for each in (
            sess.query(Customer.id).filter(Customer.id.in_(customers.keys())).all()
        ):
            customer = customers.pop(each.id)
            entries_to_update.append({"id": customer["id"], "name": customer["name"]})

        # Bulk mappings for everything that needs to be inserted
        for customer in customers.values():
            entries_to_put.append({"id": customer["id"], "name": customer["name"]})

        sess.bulk_insert_mappings(Customer, entries_to_put)
        sess.bulk_update_mappings(Customer, entries_to_update)
        sess.commit()

    print(
        "Total time for upsert with MAPPING update "
        + str(len(customers))
        + " records "
        + str(time.time() - t0)
        + " sec"
        + " inserted : "
        + str(len(entries_to_put))
        + " - updated : "
        + str(len(entries_to_update))
    )


def bulk_upsert_merge(customers):

    entries_to_update = 0
    entries_to_put = []
    with db_session() as sess:
        t0 = time.time()

        # Find all customers that needs to be updated and merge
        for each in (
            sess.query(Customer.id).filter(Customer.id.in_(customers.keys())).all()
        ):
            values = customers.pop(each.id)
            sess.merge(Customer(id=values["id"], name=values["name"]))
            entries_to_update += 1

        # Bulk mappings for everything that needs to be inserted
        for customer in customers.values():
            entries_to_put.append({"id": customer["id"], "name": customer["name"]})

        sess.bulk_insert_mappings(Customer, entries_to_put)
        sess.commit()

    print(
        "Total time for upsert with MERGE update "
        + str(len(customers))
        + " records "
        + str(time.time() - t0)
        + " sec"
        + " inserted : "
        + str(len(entries_to_put))
        + " - updated : "
        + str(entries_to_update)
    )


if __name__ == "__main__":

    batch_size = 10000

    # Only inserts
    customers_insert = {
        i: {"id": i, "name": "customer_" + str(i)} for i in range(batch_size)
    }

    # 50/50 inserts update
    customers_upsert = {
        i: {"id": i, "name": "customer_2_" + str(i)}
        for i in range(int(batch_size / 2), batch_size + int(batch_size / 2))
    }

    creat_new_database()
    bulk_upsert_mappings(customers_insert.copy())
    bulk_upsert_mappings(customers_upsert.copy())
    bulk_upsert_mappings(customers_insert.copy())

    creat_new_database()
    bulk_upsert_merge(customers_insert.copy())
    bulk_upsert_merge(customers_upsert.copy())
    bulk_upsert_merge(customers_insert.copy())

Результаты теста:

      Total time for upsert with MAPPING: 0.17138004302978516 sec inserted : 10000 - updated : 0
Total time for upsert with MAPPING: 0.22074174880981445 sec inserted : 5000 - updated : 5000
Total time for upsert with MAPPING: 0.22307634353637695 sec inserted : 0 - updated : 10000
Total time for upsert with MERGE: 0.1724097728729248 sec inserted : 10000 - updated : 0
Total time for upsert with MERGE: 7.852903842926025 sec inserted : 5000 - updated : 5000
Total time for upsert with MERGE: 15.11970829963684 sec inserted : 0 - updated : 10000

Я знаю, что это немного поздно, но я построил ответ, данный @Emil Wåreus, и превратил его в функцию, которую можно использовать на любой модели (таблице),

      def upsert_data(self, entries, model, key):
    entries_to_update = []
    entries_to_insert = []
    
    # get all entries to be updated
    for each in session.query(model).filter(getattr(model, key).in_(entries.keys())).all():
        entry = entries.pop(str(getattr(each, key)))
        entries_to_update.append(entry)
        
    # get all entries to be inserted
    for entry in entries.values():
        entries_to_insert.append(entry)

    session.bulk_insert_mappings(model, entries_to_insert)
    session.bulk_update_mappings(model, entries_to_update)

    session.commit()

entriesдолжен быть словарем со значениями первичного ключа в качестве ключей, а значения должны быть сопоставлениями (сопоставлениями значений со столбцами базы данных).

modelэто модель ORM, которую вы хотите обновить.

keyявляется первичным ключом таблицы.

Вы даже можете использовать эту функцию, чтобы получить модель таблицы, в которую вы хотите вставить из строки,

      def get_table(self, table_name):
    for c in self.base._decl_class_registry.values():
        if hasattr(c, '__tablename__') and c.__tablename__ == table_name:
            return c

Используя это, вы можете просто передать имя таблицы в виде строки в upsert_dataфункция,

      def upsert_data(self, entries, table, key):
    model = get_table(table)
    entries_to_update = []
    entries_to_insert = []
    
    # get all entries to be updated
    for each in session.query(model).filter(getattr(model, key).in_(entries.keys())).all():
        entry = entries.pop(str(getattr(each, key)))
        entries_to_update.append(entry)
        
    # get all entries to be inserted
    for entry in entries.values():
        entries_to_insert.append(entry)

    session.bulk_insert_mappings(model, entries_to_insert)
    session.bulk_update_mappings(model, entries_to_update)

    session.commit()

Это не самый безопасный способ, но он очень простой и очень быстрый. Я просто пытался выборочно перезаписать часть таблицы. Я удалил известные строки, которые, как я знал, будут конфликтовать, а затем добавил новые строки из фрейма данных pandas. Имена столбцов фрейма данных pandas должны совпадать с именами столбцов таблицы sql.

eng = create_engine('postgresql://...')
conn = eng.connect()

conn.execute("DELETE FROM my_table WHERE col = %s", val)
df.to_sql('my_table', con=eng, if_exists='append')
Другие вопросы по тегам