Как сделать upsert с SqlAlchemy?
У меня есть запись о том, что я хочу существовать в базе данных, если ее там нет, и если она уже есть (первичный ключ существует), я хочу, чтобы поля были обновлены до текущего состояния. Это часто называют упертостью.
Следующий фрагмент неполного кода демонстрирует, что будет работать, но выглядит чрезмерно неуклюжим (особенно если столбцов было намного больше). Что лучше / лучше?
Base = declarative_base()
class Template(Base):
__tablename__ = 'templates'
id = Column(Integer, primary_key = True)
name = Column(String(80), unique = True, index = True)
template = Column(String(80), unique = True)
description = Column(String(200))
def __init__(self, Name, Template, Desc):
self.name = Name
self.template = Template
self.description = Desc
def UpsertDefaultTemplate():
sess = Session()
desired_default = Template("default", "AABBCC", "This is the default template")
try:
q = sess.query(Template).filter_by(name = desiredDefault.name)
existing_default = q.one()
except sqlalchemy.orm.exc.NoResultFound:
#default does not exist yet, so add it...
sess.add(desired_default)
else:
#default already exists. Make sure the values are what we want...
assert isinstance(existing_default, Template)
existing_default.name = desired_default.name
existing_default.template = desired_default.template
existing_default.description = desired_default.description
sess.flush()
Есть ли лучший или менее подробный способ сделать это? Примерно так было бы здорово
sess.upsert_this(desired_default, unique_key = "name")
Хотя unique_key
Очевидно, что kwarg не нужен (ORM должен легко это понять). Я добавил его только потому, что SQLAlchemy работает только с первичным ключом. Например: я смотрел, будет ли применим Session.merge, но это работает только с первичным ключом, который в данном случае является автоинкрементным идентификатором, который не очень полезен для этой цели.
Пример использования для этого просто при запуске серверного приложения, которое могло обновить ожидаемые данные по умолчанию. то есть: нет проблем параллелизма для этого upsert.
11 ответов
SQLAlchemy поддерживает ON CONFLICT
теперь с двумя методами on_conflict_do_update()
а также on_conflict_do_nothing()
:
Копирование из документации:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict
SQLAlchemy имеет поведение "сохранить или обновить", которое в последних версиях было встроено в session.add
, но ранее был отдельным session.saveorupdate
вызов. Это не "упрек", но может быть достаточно для ваших нужд.
Хорошо, что вы спрашиваете о классе с несколькими уникальными ключами; Я считаю, что именно поэтому нет единственно правильного способа сделать это. Первичный ключ также является уникальным ключом. Если бы не было уникальных ограничений, только первичный ключ, это было бы достаточно простой проблемой: если ничего с данным ID не существует, или если ID - None, создайте новую запись; иначе обновите все остальные поля в существующей записи с этим первичным ключом.
Однако при наличии дополнительных уникальных ограничений возникают логические проблемы с этим простым подходом. Если вы хотите "сохранить" объект, и первичный ключ вашего объекта соответствует существующей записи, а другой уникальный столбец соответствует другой записи, то что вы будете делать? Аналогично, если первичный ключ не соответствует существующей записи, но другой уникальный столбец соответствует существующей записи, то что? Может быть правильный ответ для вашей конкретной ситуации, но в целом я бы сказал, что нет единого правильного ответа.
Это было бы причиной того, что нет встроенной операции "upsert". Приложение должно определить, что это означает в каждом конкретном случае.
В настоящее время SQLAlchemy предоставляет две полезные функции on_conflict_do_nothing
а также on_conflict_do_update
, Эти функции полезны, но требуют перехода от интерфейса ORM к более низкому уровню - ядру SQLAlchemy.
Хотя эти две функции затрудняют использование синтаксиса SQLAlchemy не так сложно, эти функции далеки от того, чтобы предоставить полное готовое решение для апсертирования.
Мой распространенный вариант использования - сохранить большой кусок строк в одном запросе SQL / сеансе. Я обычно сталкиваюсь с двумя проблемами с апсертингом:
Например, функции ORM более высокого уровня, к которым мы привыкли, отсутствуют. Вы не можете использовать объекты ORM, но вместо этого должны предоставить ForeignKey
s во время вставки.
Я использую следующую функцию, которую я написал для решения обеих этих проблем:
def upsert(session, model, rows):
table = model.__table__
stmt = postgresql.insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
if not update_dict:
raise ValueError("insert_or_update resulted in an empty update_dict")
stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
set_=update_dict)
seen = set()
foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
def handle_foreignkeys_constraints(row):
for c_name, c_value in foreign_keys.items():
foreign_obj = row.pop(c_value.table.name, None)
row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None
for const in unique_constraints:
unique = tuple([const,] + [row[col.name] for col in const.columns])
if unique in seen:
return None
seen.add(unique)
return row
rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
session.execute(stmt, rows)
Я использую подход "посмотри, прежде чем прыгнуть":
# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
filter(Switch_Command.switch_id == switch.id).\
filter(Switch_Command.command_id == command.id).first()
# If we didn't get anything, make one
if not switch_command:
switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)
# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()
session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()
Преимущество в том, что это нейтрально по отношению к БД, и я думаю, что это понятно. Недостатком является то, что в сценарии, подобном следующему, есть потенциальное состояние гонки:
- мы запрашиваем БД для
switch_command
и не найти - мы создаем
switch_command
- другой процесс или поток создает
switch_command
с тем же первичным ключом, что и у нас - мы пытаемся совершить наш
switch_command
Есть несколько ответов, и вот еще один ответ (YAA). Другие ответы не так удобочитаемы из-за задействованного метапрограммирования. Вот пример того, что
Использует SQLAlchemy ORM
Показывает, как создать строку, если строк нет, используя
on_conflict_do_nothing
Показывает, как обновить существующую строку (если есть) без создания новой строки, используя
on_conflict_do_update
Использует первичный ключ таблицы как
constraint
Более длинный пример в исходном вопросе, с чем связан этот код .
import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
class PairState(Base):
__tablename__ = "pair_state"
# This table has 1-to-1 relationship with Pair
pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
pair = orm.relationship(Pair,
backref=orm.backref("pair_state",
lazy="dynamic",
cascade="all, delete-orphan",
single_parent=True, ), )
# First raw event in data stream
first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))
# Last raw event in data stream
last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))
# The last hypertable entry added
last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))
@staticmethod
def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
"""Sets the first event value if not exist yet."""
dbsession.execute(
insert(PairState).
values(pair_id=pair_id, first_event_at=ts).
on_conflict_do_nothing()
)
@staticmethod
def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
"""Replaces the the column last_event_at for a named pair."""
# Based on the original example of https://stackoverflow.com/a/49917004/315168
dbsession.execute(
insert(PairState).
values(pair_id=pair_id, last_event_at=ts).
on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
)
@staticmethod
def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
"""Replaces the the column last_interval_at for a named pair."""
dbsession.execute(
insert(PairState).
values(pair_id=pair_id, last_interval_at=ts).
on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
)
Ниже работает хорошо для меня с базой данных красного смещения, а также будет работать для комбинированного ограничения первичного ключа.
ИСТОЧНИК: это
Всего несколько изменений, необходимых для создания движка SQLAlchemy в функции def start_engine()
from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql
Base = declarative_base()
def start_engine():
engine = create_engine(os.getenv('SQLALCHEMY_URI',
'postgresql://localhost:5432/upsert'))
connect = engine.connect()
meta = MetaData(bind=engine)
meta.reflect(bind=engine)
return engine
class DigitalSpend(Base):
__tablename__ = 'digital_spend'
report_date = Column(Date, nullable=False)
day = Column(Date, nullable=False, primary_key=True)
impressions = Column(Integer)
conversions = Column(Integer)
def __repr__(self):
return str([getattr(self, c.name, None) for c in self.__table__.c])
def compile_query(query):
compiler = query.compile if not hasattr(query, 'statement') else
query.statement.compile
return compiler(dialect=postgresql.dialect())
def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
table = model.__table__
stmt = insert(table).values(rows)
update_cols = [c.name for c in table.c
if c not in list(table.primary_key.columns)
and c.name not in no_update_cols]
on_conflict_stmt = stmt.on_conflict_do_update(
index_elements=table.primary_key.columns,
set_={k: getattr(stmt.excluded, k) for k in update_cols},
index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
)
print(compile_query(on_conflict_stmt))
session.execute(on_conflict_stmt)
session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])
Это позволяет получить доступ к базовым моделям на основе имен строк
def get_class_by_tablename(tablename):
"""Return class reference mapped to table.
https://stackru.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
:param tablename: String with name of table.
:return: Class reference or None.
"""
for c in Base._decl_class_registry.values():
if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
return c
sqla_tbl = get_class_by_tablename(table_name)
def handle_upsert(record_dict, table):
"""
handles updates when there are primary key conflicts
"""
try:
self.active_session().add(table(**record_dict))
except:
# Here we'll assume the error is caused by an integrity error
# We do this because the error classes are passed from the
# underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
# them with it's own code - this should be updated to have
# explicit error handling for each new db engine
# <update>add explicit error handling for each db engine</update>
active_session.rollback()
# Query for conflic class, use update method to change values based on dict
c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk
c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols
c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()
# apply new data values to the existing record
for k, v in record_dict.items()
setattr(c_target_record, k, v)
Поскольку у нас были проблемы с сгенерированными идентификаторами по умолчанию и ссылками, которые приводили к ошибкам ForeignKeyViolation, таким как
update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".
нам пришлось исключить идентификатор для словаря обновления, так как в противном случае он всегда будет генерироваться как новое значение по умолчанию.
Кроме того, метод возвращает созданный/обновленный объект.
from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert
def upsert(session, data, key_columns, model):
stmt = insert(model).values(data)
# Important to exclude the ID for update!
exclude_for_update = [model.id.name, *key_columns]
update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}
stmt = stmt.on_conflict_do_update(
index_elements=key_columns,
set_=update_dict
).returning(model)
orm_stmt = (
select(model)
.from_statement(stmt)
.execution_options(populate_existing=True)
)
return session.execute(orm_stmt).scalar()
Пример:
class UpsertUser(Base):
__tablename__ = 'upsert_user'
id = Column(Id, primary_key=True, default=uuid.uuid4)
name: str = Column(sa.String, nullable=False)
user_sid: str = Column(sa.String, nullable=False, unique=True)
house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)
class UpsertHouse(Base):
__tablename__ = 'upsert_house'
id = Column(Id, primary_key=True, default=uuid.uuid4)
admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)
# Usage
upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)
Примечание. Проверено только на postgresql, но может работать и с другими БД, которые поддерживают ОБНОВЛЕНИЕ КЛЮЧА ДУБЛИКАЦИИ, например, MySQL.
Это работает для меня с sqlite3 и postgres. Хотя он может потерпеть неудачу с объединенными ограничениями первичного ключа и, скорее всего, потерпит неудачу с дополнительными уникальными ограничениями.
try:
t = self._meta.tables[data['table']]
except KeyError:
self._log.error('table "%s" unknown', data['table'])
return
try:
q = insert(t, values=data['values'])
self._log.debug(q)
self._db.execute(q)
except IntegrityError:
self._log.warning('integrity error')
where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
q = update(t, values=update_dict).where(*where_clause)
self._log.debug(q)
self._db.execute(q)
except Exception as e:
self._log.error('%s: %s', t.name, e)
В случае sqlite,
sqlite_on_conflict='REPLACE'
можно использовать при определении
UniqueConstraint
, а также
sqlite_on_conflict_unique
для уникального ограничения на один столбец. затем
session.add
будет работать так же, как
upsert
. См. официальную документацию .
Я использую этот код для upsert Перед использованием этого кода вы должны добавить первичные ключи в таблицу в базе данных.
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.inspection import inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.dialects.postgresql import insert
def upsert(df, engine, table_name, schema=None, chunk_size = 1000):
metadata = MetaData(schema=schema)
metadata.bind = engine
table = Table(table_name, metadata, schema=schema, autoload=True)
# olny use common columns between df and table.
table_columns = {column.name for column in table.columns}
df_columns = set(df.columns)
intersection_columns = table_columns.intersection(df_columns)
df1 = df[intersection_columns]
records = df1.to_dict('records')
# get list of fields making up primary key
primary_keys = [key.name for key in inspect(table).primary_key]
with engine.connect() as conn:
chunks = [records[i:i + chunk_size] for i in range(0, len(records), chunk_size)]
for chunk in chunks:
stmt = insert(table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
s = stmt.on_conflict_do_update(
index_elements= primary_keys,
set_=update_dict)
conn.execute(s)