SQLAlchemy - выполнение массового переноса (если существует, обновление, вставка) в postgresql
Я пытаюсь написать массовый переход в Python с помощью модуля SQLAlchemy (не в SQL!).
Я получаю следующую ошибку при добавлении SQLAlchemy:
sqlalchemy.exc.IntegrityError: (IntegrityError) duplicate key value violates unique constraint "posts_pkey"
DETAIL: Key (id)=(TEST1234) already exists.
У меня есть стол posts
с первичным ключом на id
колонка.
В этом примере у меня уже есть строка в БД с id=TEST1234
, Когда я пытаюсь db.session.add()
новый объект сообщения с id
установлен в TEST1234
Я получаю ошибку выше. У меня сложилось впечатление, что если первичный ключ уже существует, запись будет обновлена.
Как я могу использовать Flask-SQLAlchemy только на основе первичного ключа? Есть ли простое решение?
Если нет, я всегда могу проверить и удалить любую запись с соответствующим идентификатором, а затем вставить новую запись, но это кажется дорогим для моей ситуации, когда я не ожидаю много обновлений.
6 ответов
В SQLAlchemy есть операция upsert-esque:
db.session.merge()
После того, как я нашел эту команду, я смог выполнить upserts, но стоит упомянуть, что эта операция медленная для массового "upsert".
Альтернатива состоит в том, чтобы получить список первичных ключей, которые вы хотели бы сохранить, и запросить базу данных для любых соответствующих идентификаторов:
# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object
my_new_posts = {1: post1, 5: post5, 1000: post1000}
for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
# Only merge those posts which already exist in the database
db.session.merge(my_new_posts.pop(each.id))
# Only add those posts which did not exist in the database
db.session.add_all(my_new_posts.values())
# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()
Как вы отметили это с помощью postgresql
Я предполагаю, что это то, что вы используете, поэтому вы можете использовать on_conflict_do_update
вариант. Вот простой пример:
class Post(Base):
"""
A simple class for demonstration
"""
id = Column(Integer, primary_key=True)
title = Column(Unicode)
# Prepare all the values that should be "upserted" to the DB
values = [
{"id": 1, "title": "mytitle 1"},
{"id": 2, "title": "mytitle 2"},
{"id": 3, "title": "mytitle 3"},
{"id": 4, "title": "mytitle 4"},
]
stmt = insert(Post).values(values)
stmt = stmt.on_conflict_do_update(
# Let's use the constraint name which was visible in the original posts error msg
constraint="post_pkey",
# The columns that should be updated on conflict
set_={
"title": stmt.excluded.title
}
)
session.execute(stmt)
См. Дополнительную информацию в документации PG (например, откуда берется термин "исключенный").
Боковое примечание о повторяющихся именах столбцов
В приведенном выше коде имена столбцов используются как ключи dict как в values
список и аргумент для set_
. Если имя столбца изменено в определении класса, его нужно изменить везде, иначе он сломается. Этого можно избежать, обратившись к определениям столбцов, сделав код немного уродливее, но более надежным:
coldefs = Post.__table__.c
values = [
{coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"},
...
]
...
set_={
coldefs.title.name: stmt.excluded.title
...
}
Альтернативный подход с использованием расширения компиляции ( https://docs.sqlalchemy.org/en/13/core/compiler.html):
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
"""
converts every SQL insert to an upsert i.e;
INSERT INTO test (foo, bar) VALUES (1, 'a')
becomes:
INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
(assuming foo is a primary key)
:param insert_stmt: Original insert statement
:param compiler: SQL Compiler
:param kwargs: optional arguments
:return: upsert statement
"""
pk = insert_stmt.table.primary_key
insert = compiler.visit_insert(insert_stmt, **kwargs)
ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
upsert = ' '.join((insert, ondup, updates))
return upsert
Это должно гарантировать, что все операторы вставки будут вести себя как восходящие. Эта реализация находится на диалекте Postgres, но ее довольно легко изменить для диалекта MySQL.
Я начал смотреть на это и думаю, что нашел довольно эффективный способ делать upserts в sqlalchemy с сочетанием
bulk_insert_mappings
а также
bulk_update_mappings
вместо
merge
.
import time
import sqlite3
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from contextlib import contextmanager
engine = None
Session = sessionmaker()
Base = declarative_base()
def creat_new_database(db_name="sqlite:///bulk_upsert_sqlalchemy.db"):
global engine
engine = create_engine(db_name, echo=False)
local_session = scoped_session(Session)
local_session.remove()
local_session.configure(bind=engine, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
@contextmanager
def db_session():
local_session = scoped_session(Session)
session = local_session()
session.expire_on_commit = False
try:
yield session
except BaseException:
session.rollback()
raise
finally:
session.close()
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
def bulk_upsert_mappings(customers):
entries_to_update = []
entries_to_put = []
with db_session() as sess:
t0 = time.time()
# Find all customers that needs to be updated and build mappings
for each in (
sess.query(Customer.id).filter(Customer.id.in_(customers.keys())).all()
):
customer = customers.pop(each.id)
entries_to_update.append({"id": customer["id"], "name": customer["name"]})
# Bulk mappings for everything that needs to be inserted
for customer in customers.values():
entries_to_put.append({"id": customer["id"], "name": customer["name"]})
sess.bulk_insert_mappings(Customer, entries_to_put)
sess.bulk_update_mappings(Customer, entries_to_update)
sess.commit()
print(
"Total time for upsert with MAPPING update "
+ str(len(customers))
+ " records "
+ str(time.time() - t0)
+ " sec"
+ " inserted : "
+ str(len(entries_to_put))
+ " - updated : "
+ str(len(entries_to_update))
)
def bulk_upsert_merge(customers):
entries_to_update = 0
entries_to_put = []
with db_session() as sess:
t0 = time.time()
# Find all customers that needs to be updated and merge
for each in (
sess.query(Customer.id).filter(Customer.id.in_(customers.keys())).all()
):
values = customers.pop(each.id)
sess.merge(Customer(id=values["id"], name=values["name"]))
entries_to_update += 1
# Bulk mappings for everything that needs to be inserted
for customer in customers.values():
entries_to_put.append({"id": customer["id"], "name": customer["name"]})
sess.bulk_insert_mappings(Customer, entries_to_put)
sess.commit()
print(
"Total time for upsert with MERGE update "
+ str(len(customers))
+ " records "
+ str(time.time() - t0)
+ " sec"
+ " inserted : "
+ str(len(entries_to_put))
+ " - updated : "
+ str(entries_to_update)
)
if __name__ == "__main__":
batch_size = 10000
# Only inserts
customers_insert = {
i: {"id": i, "name": "customer_" + str(i)} for i in range(batch_size)
}
# 50/50 inserts update
customers_upsert = {
i: {"id": i, "name": "customer_2_" + str(i)}
for i in range(int(batch_size / 2), batch_size + int(batch_size / 2))
}
creat_new_database()
bulk_upsert_mappings(customers_insert.copy())
bulk_upsert_mappings(customers_upsert.copy())
bulk_upsert_mappings(customers_insert.copy())
creat_new_database()
bulk_upsert_merge(customers_insert.copy())
bulk_upsert_merge(customers_upsert.copy())
bulk_upsert_merge(customers_insert.copy())
Результаты теста:
Total time for upsert with MAPPING: 0.17138004302978516 sec inserted : 10000 - updated : 0
Total time for upsert with MAPPING: 0.22074174880981445 sec inserted : 5000 - updated : 5000
Total time for upsert with MAPPING: 0.22307634353637695 sec inserted : 0 - updated : 10000
Total time for upsert with MERGE: 0.1724097728729248 sec inserted : 10000 - updated : 0
Total time for upsert with MERGE: 7.852903842926025 sec inserted : 5000 - updated : 5000
Total time for upsert with MERGE: 15.11970829963684 sec inserted : 0 - updated : 10000
Я знаю, что это немного поздно, но я построил ответ, данный @Emil Wåreus, и превратил его в функцию, которую можно использовать на любой модели (таблице),
def upsert_data(self, entries, model, key):
entries_to_update = []
entries_to_insert = []
# get all entries to be updated
for each in session.query(model).filter(getattr(model, key).in_(entries.keys())).all():
entry = entries.pop(str(getattr(each, key)))
entries_to_update.append(entry)
# get all entries to be inserted
for entry in entries.values():
entries_to_insert.append(entry)
session.bulk_insert_mappings(model, entries_to_insert)
session.bulk_update_mappings(model, entries_to_update)
session.commit()
entries
должен быть словарем со значениями первичного ключа в качестве ключей, а значения должны быть сопоставлениями (сопоставлениями значений со столбцами базы данных).
model
это модель ORM, которую вы хотите обновить.
key
является первичным ключом таблицы.
Вы даже можете использовать эту функцию, чтобы получить модель таблицы, в которую вы хотите вставить из строки,
def get_table(self, table_name):
for c in self.base._decl_class_registry.values():
if hasattr(c, '__tablename__') and c.__tablename__ == table_name:
return c
Используя это, вы можете просто передать имя таблицы в виде строки в
upsert_data
функция,
def upsert_data(self, entries, table, key):
model = get_table(table)
entries_to_update = []
entries_to_insert = []
# get all entries to be updated
for each in session.query(model).filter(getattr(model, key).in_(entries.keys())).all():
entry = entries.pop(str(getattr(each, key)))
entries_to_update.append(entry)
# get all entries to be inserted
for entry in entries.values():
entries_to_insert.append(entry)
session.bulk_insert_mappings(model, entries_to_insert)
session.bulk_update_mappings(model, entries_to_update)
session.commit()
Это не самый безопасный способ, но он очень простой и очень быстрый. Я просто пытался выборочно перезаписать часть таблицы. Я удалил известные строки, которые, как я знал, будут конфликтовать, а затем добавил новые строки из фрейма данных pandas. Имена столбцов фрейма данных pandas должны совпадать с именами столбцов таблицы sql.
eng = create_engine('postgresql://...')
conn = eng.connect()
conn.execute("DELETE FROM my_table WHERE col = %s", val)
df.to_sql('my_table', con=eng, if_exists='append')