Стратегии массового обновления SQLAlchemy
В настоящее время я пишу веб-приложение (Flask), использующее SQLAlchemy (в GAE, подключаясь к облачному Google MySQL), и мне необходимо выполнить массовое обновление таблицы. Короче говоря, выполняется ряд вычислений, в результате чего требуется обновить единственное значение для 1000 объектов. На данный момент я делаю все это в транзакции, но все же в конце, сброс / принятие занимает много времени.
Таблица имеет индекс на id
и все это осуществляется за одну транзакцию. Поэтому я считаю, что я избежал обычных ошибок, но все еще очень медленно.
INFO 2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}
Насколько я понимаю, нет никакого способа сделать массовое обновление в SQL на самом деле, и приведенный выше оператор в конечном итоге представляет собой несколько операторов UPDATE, отправляемых на сервер.
Я пытался использовать Session.bulk_update_mappings()
но это, похоже, на самом деле ничего не делает:(Не знаю почему, но обновления на самом деле никогда не происходят. Я не вижу примеров использования этого метода (в том числе в пакете производительности), поэтому не уверен, что он предназначен использоваться.
Обсуждаемая мною техника - это массовая вставка в другую таблицу и затем ОБНОВЛЕНИЕ ОБЪЕДИНЕНИЯ. Я дал ему тест, как показано ниже, и он, кажется, значительно быстрее.
wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')
Но проблема сейчас в том, как структурировать мой код. Я использую ORM, и мне нужно как-то не "грязный" оригинал Wallet
объекты, чтобы они не были зафиксированы по-старому. Я мог бы просто создать эти Ledger
объекты вместо этого и хранить их список, а затем вручную вставить их в конце моей массовой операции. Но это почти пахнет, как будто я копирую часть работы механизма ORM.
Есть ли более умный способ сделать это? До сих пор мой мозг падает что-то вроде:
class Wallet(Base):
...
_balance = Column(Float)
...
@property
def balance(self):
# first check if we have a ledger of the same id
# and return the amount in that, otherwise...
return self._balance
@balance.setter
def balance(self, amount):
l = Ledger(id=self.id, amount=amount)
# add l to a list somewhere then process later
# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE
Как я уже сказал, все это, кажется, борется с инструментами, которые у меня (возможно) есть. Есть ли лучший способ справиться с этим? Могу ли я подключиться к механизму ORM, чтобы сделать это? Или есть еще лучший способ сделать массовые обновления?
РЕДАКТИРОВАТЬ: Или может быть что-то умное с событиями и сессиями? Может быть, до этого?
РЕДАКТИРОВАТЬ 2: Итак, я попытался задействовать механизм событий, и теперь у меня есть это:
@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
ledgers = []
if session.dirty:
for elem in session.dirty:
if ( session.is_modified(elem, include_collections=False) ):
if isinstance(elem, Wallet):
session.expunge(elem)
ledgers.append(Ledger(id=elem.id, amount=elem.balance))
if ledgers:
session.bulk_save_objects(ledgers)
session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
session.execute('TRUNCATE ledger')
Что кажется мне довольно странным и злым, но, похоже, работает хорошо. Есть подводные камни или лучше подходы?
Матф
2 ответа
По сути, вы обходите ORM, чтобы оптимизировать производительность. Поэтому не удивляйтесь, что вы "копируете работу, которую выполняет ORM", потому что это именно то, что вам нужно сделать.
Если у вас нет большого количества мест, где вам нужно делать массовые обновления, подобные этому, я бы рекомендовал не подходить к магическим событиям; просто написание явных запросов гораздо проще.
Я рекомендую использовать ядро SQLAlchemy вместо ORM для обновления:
ledger = Table("ledger", db.metadata,
Column("wallet_id", Integer, primary_key=True),
Column("new_balance", Float),
prefixes=["TEMPORARY"],
)
wallets = db_session.query(Wallet).all()
# figure out new balances
balance_map = {}
for w in wallets:
balance_map[w.id] = calculate_new_balance(w)
# create temp table with balances we need to update
ledger.create(bind=db.session.get_bind())
# insert update data
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v}
for k, v in balance_map.items()])
# perform update
db.session.execute(Wallet.__table__
.update()
.values(balance=ledger.c.new_balance)
.where(Wallet.__table__.c.id == ledger.c.wallet_id))
# drop temp table
ledger.drop(bind=db.session.get_bind())
# commit changes
db.session.commit()
Как правило, плохой дизайн схемы требует частого обновления тысяч строк. Это в сторону...
План A: Написать код ORM, который генерирует
START TRANSACTION;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
...
COMMIT;
План B: написать код ORM, который генерирует
CREATE TEMPORARY TABLE ToDo (
id ...,
new_balance ...
);
INSERT INTO ToDo -- either one row at a time, or a bulk insert
UPDATE wallet
JOIN ToDo USING(id)
SET wallet.balance = ToDo.new_balance; -- bulk update
(Проверьте синтаксис; проверить; и т. Д.)