Стратегии массового обновления SQLAlchemy

В настоящее время я пишу веб-приложение (Flask), использующее SQLAlchemy (в GAE, подключаясь к облачному Google MySQL), и мне необходимо выполнить массовое обновление таблицы. Короче говоря, выполняется ряд вычислений, в результате чего требуется обновить единственное значение для 1000 объектов. На данный момент я делаю все это в транзакции, но все же в конце, сброс / принятие занимает много времени.

Таблица имеет индекс на id и все это осуществляется за одну транзакцию. Поэтому я считаю, что я избежал обычных ошибок, но все еще очень медленно.

INFO     2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}

Насколько я понимаю, нет никакого способа сделать массовое обновление в SQL на самом деле, и приведенный выше оператор в конечном итоге представляет собой несколько операторов UPDATE, отправляемых на сервер.

Я пытался использовать Session.bulk_update_mappings() но это, похоже, на самом деле ничего не делает:(Не знаю почему, но обновления на самом деле никогда не происходят. Я не вижу примеров использования этого метода (в том числе в пакете производительности), поэтому не уверен, что он предназначен использоваться.

Обсуждаемая мною техника - это массовая вставка в другую таблицу и затем ОБНОВЛЕНИЕ ОБЪЕДИНЕНИЯ. Я дал ему тест, как показано ниже, и он, кажется, значительно быстрее.

wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')

Но проблема сейчас в том, как структурировать мой код. Я использую ORM, и мне нужно как-то не "грязный" оригинал Wallet объекты, чтобы они не были зафиксированы по-старому. Я мог бы просто создать эти Ledger объекты вместо этого и хранить их список, а затем вручную вставить их в конце моей массовой операции. Но это почти пахнет, как будто я копирую часть работы механизма ORM.

Есть ли более умный способ сделать это? До сих пор мой мозг падает что-то вроде:

class Wallet(Base):
    ...
    _balance = Column(Float)
    ...

@property
def balance(self):
    # first check if we have a ledger of the same id
    # and return the amount in that, otherwise...
    return self._balance

@balance.setter
def balance(self, amount):
    l = Ledger(id=self.id, amount=amount)
    # add l to a list somewhere then process later

# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE

Как я уже сказал, все это, кажется, борется с инструментами, которые у меня (возможно) есть. Есть ли лучший способ справиться с этим? Могу ли я подключиться к механизму ORM, чтобы сделать это? Или есть еще лучший способ сделать массовые обновления?

РЕДАКТИРОВАТЬ: Или может быть что-то умное с событиями и сессиями? Может быть, до этого?

РЕДАКТИРОВАТЬ 2: Итак, я попытался задействовать механизм событий, и теперь у меня есть это:

@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
    ledgers = []

    if session.dirty:
        for elem in session.dirty:
            if ( session.is_modified(elem, include_collections=False) ):
                if isinstance(elem, Wallet):
                    session.expunge(elem)
                    ledgers.append(Ledger(id=elem.id, amount=elem.balance))

    if ledgers:
        session.bulk_save_objects(ledgers)
        session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
        session.execute('TRUNCATE ledger')

Что кажется мне довольно странным и злым, но, похоже, работает хорошо. Есть подводные камни или лучше подходы?

Матф

2 ответа

Решение

По сути, вы обходите ORM, чтобы оптимизировать производительность. Поэтому не удивляйтесь, что вы "копируете работу, которую выполняет ORM", потому что это именно то, что вам нужно сделать.

Если у вас нет большого количества мест, где вам нужно делать массовые обновления, подобные этому, я бы рекомендовал не подходить к магическим событиям; просто написание явных запросов гораздо проще.

Я рекомендую использовать ядро ​​SQLAlchemy вместо ORM для обновления:

ledger = Table("ledger", db.metadata,
    Column("wallet_id", Integer, primary_key=True),
    Column("new_balance", Float),
    prefixes=["TEMPORARY"],
)


wallets = db_session.query(Wallet).all()

# figure out new balances
balance_map = {}
for w in wallets:
    balance_map[w.id] = calculate_new_balance(w)

# create temp table with balances we need to update
ledger.create(bind=db.session.get_bind())

# insert update data
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v}
                                           for k, v in balance_map.items()])

# perform update
db.session.execute(Wallet.__table__
                         .update()
                         .values(balance=ledger.c.new_balance)
                         .where(Wallet.__table__.c.id == ledger.c.wallet_id))

# drop temp table
ledger.drop(bind=db.session.get_bind())

# commit changes
db.session.commit()

Как правило, плохой дизайн схемы требует частого обновления тысяч строк. Это в сторону...

План A: Написать код ORM, который генерирует

START TRANSACTION;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
...
COMMIT;

План B: написать код ORM, который генерирует

CREATE TEMPORARY TABLE ToDo (
    id ...,
    new_balance ...
);
INSERT INTO ToDo -- either one row at a time, or a bulk insert
UPDATE wallet
    JOIN ToDo USING(id)
    SET wallet.balance = ToDo.new_balance;  -- bulk update

(Проверьте синтаксис; проверить; и т. Д.)

Другие вопросы по тегам