Как сделать UPSERT (MERGE, INSERT ... ON DUPLICATE UPDATE) в PostgreSQL?
Очень часто задаваемый вопрос здесь, как сделать upsert, что называется MySQL INSERT ... ON DUPLICATE UPDATE
и стандарт поддерживает как часть MERGE
операция.
Учитывая, что PostgreSQL не поддерживает его напрямую (до pg 9.5), как вы это делаете? Учтите следующее:
CREATE TABLE testtable (
id integer PRIMARY KEY,
somedata text NOT NULL
);
INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');
А теперь представьте, что вы хотите "перевернуть" кортежи (2, 'Joe')
, (3, 'Alan')
поэтому новое содержимое таблицы будет:
(1, 'fred'),
(2, 'Joe'), -- Changed value of existing tuple
(3, 'Alan') -- Added new tuple
Вот о чем говорят люди, когда обсуждают upsert
, Важно отметить, что любой подход должен быть безопасным при наличии нескольких транзакций, работающих на одной и той же таблице - либо с использованием явной блокировки, либо иным образом защищаясь от возникающих условий гонки.
Эта тема широко обсуждается на Вставке, на дубликате обновления в PostgreSQL?, но это об альтернативах синтаксису MySQL, и с течением времени он вырос довольно много не связанных деталей. Я работаю над окончательными ответами.
Эти методы также полезны для "вставить, если не существует, в противном случае ничего не делать", то есть "вставить... при игнорировании дубликата ключа".
7 ответов
9.5 и новее:
Поддержка PostgreSQL 9.5 и новее INSERT ... ON CONFLICT UPDATE
(а также ON CONFLICT DO NOTHING
), т.е.
В сравнении сON DUPLICATE KEY UPDATE
,
Для использования см. Руководство - в частности, предложение конфликта_на синтаксической диаграмме и пояснительный текст.
В отличие от решений для 9.4 и старше, приведенных ниже, эта функция работает с несколькими конфликтующими строками и не требует эксклюзивной блокировки или цикла повторной попытки.
Коммит, добавляющий эту функцию, находится здесь, и обсуждение ее разработки здесь.
Если вы используете 9.5 и вам не нужна обратная совместимость, вы можете прекратить чтение сейчас.
9,4 и старше:
PostgreSQL не имеет встроенного UPSERT
(или же MERGE
), и делать это эффективно в условиях одновременного использования очень сложно.
Эта статья обсуждает проблему в полезных деталях.
В общем, вы должны выбрать один из двух вариантов:
- Отдельные операции вставки / обновления в цикле повтора; или же
- Блокировка таблицы и выполнение пакетного слияния
Индивидуальная петля повторения строки
Использование отдельных загрузчиков строк в цикле повторов является разумным вариантом, если вы хотите, чтобы несколько соединений одновременно пытались выполнить вставки.
Документация PostgreSQL содержит полезную процедуру, которая позволит вам делать это в цикле внутри базы данных. Он защищает от потерянных обновлений и вставки рас, в отличие от большинства наивных решений. Это будет работать только в READ COMMITTED
режим и безопасен только в том случае, если это единственное, что вы делаете в транзакции. Функция не будет работать правильно, если триггеры или вторичные уникальные ключи вызывают уникальные нарушения.
Эта стратегия очень неэффективна. Когда это целесообразно, вы должны поставить работу в очередь и выполнить массовую загрузку, как описано ниже.
Многие попытки решить эту проблему не учитывают откатов, поэтому они приводят к неполным обновлениям. Две транзакции мчатся друг с другом; один из них успешно INSERT
s; другой получает ошибку дублированного ключа и делает UPDATE
вместо. UPDATE
блоки в ожидании INSERT
откатить или зафиксировать. Когда он откатывается, UPDATE
повторная проверка условия соответствует нулю строк, поэтому даже если UPDATE
фиксирует, что это на самом деле не принесло ожидаемого эффекта. Вы должны проверить количество строк результата и при необходимости повторить попытку.
Некоторые попытки решения также не учитывают гонки SELECT. Если вы попробуете очевидное и простое:
-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.
BEGIN;
UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;
-- Remember, this is WRONG. Do NOT COPY IT.
INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);
COMMIT;
затем, когда два запускаются одновременно, есть несколько режимов отказа. Одним из них является уже обсуждаемая проблема с перепроверкой обновлений. Другой, где оба UPDATE
в то же время, сопоставляя ноль строк и продолжая. Тогда они оба делают EXISTS
тест, который происходит до INSERT
, Оба получают ноль строк, поэтому оба INSERT
, Один не удается с ошибкой дубликата ключа.
Вот почему вам нужен повторный цикл. Вы можете подумать, что вы можете предотвратить повторяющиеся ошибки ключа или потерянные обновления с помощью умного SQL, но не можете. Вам нужно проверить количество строк или обработать дубликаты ошибок ключа (в зависимости от выбранного подхода) и повторить попытку.
Пожалуйста, не катите свое собственное решение для этого. Как и в случае с очередями сообщений, это, вероятно, неправильно.
Массовый уперт с замком
Иногда вы хотите выполнить массовое обновление, где у вас есть новый набор данных, который вы хотите объединить в более старый существующий набор данных. Это намного эффективнее, чем отдельные аппроциклы строк, и должно быть предпочтительным, когда это целесообразно.
В этом случае вы обычно выполняете следующий процесс:
CREATE
TEMPORARY
ТаблицаCOPY
или массово вставьте новые данные во временную таблицуLOCK
целевой столIN EXCLUSIVE MODE
, Это позволяет другим транзакциямSELECT
, но не вносите никаких изменений в таблицу.Сделать
UPDATE ... FROM
существующих записей, используя значения во временной таблице;Сделать
INSERT
из строк, которые еще не существуют в целевой таблице;COMMIT
, открыв замок.
Например, для примера, приведенного в вопросе, используется многозначное INSERT
заполнить временную таблицу:
BEGIN;
CREATE TEMPORARY TABLE newvals(id integer, somedata text);
INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');
LOCK TABLE testtable IN EXCLUSIVE MODE;
UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;
INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;
COMMIT;
Связанное чтение
- UPSERT вики-страница
- UPSERTisms в Postgres
- Вставить, при повторном обновлении в PostgreSQL?
- http://petereisentraut.blogspot.com/2010/05/merge-syntax.html
- Подтвердить с транзакцией
- SELECT или INSERT в функции склонны к условиям гонки?
- SQL
MERGE
на PostgreSQL вики - Самый идиоматичный способ внедрения UPSERT в Postgresql на сегодняшний день
Как насчет MERGE
?
SQL-стандарт MERGE
на самом деле имеет плохо определенную семантику параллелизма и не подходит для размещения без предварительной блокировки таблицы.
Это действительно полезный оператор OLAP для объединения данных, но на самом деле он не является полезным решением для безопасного параллелизма. Есть много советов людям, использующим другие СУБД для использования MERGE
для upsts, но это на самом деле неправильно.
Другие БД:
INSERT ... ON DUPLICATE KEY UPDATE
в MySQLMERGE
из MS SQL Server (но см. выше оMERGE
проблемы)MERGE
от Oracle (но см. выше оMERGE
проблемы)
Вот несколько примеров для insert ... on conflict ...
(стр. 9,5+):
Вставь, на конфликт - ничего не делай.
insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict do nothing;
Вставить, при конфликте - выполнить обновление, указать цель конфликта через столбец.
insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict(id) do update set name = 'new_name', size = 3;
Вставить, при конфликте - выполнить обновление, указать цель конфликта через имя ограничения.
insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;
Я пытаюсь внести свой вклад в другое решение для одной проблемы вставки с версиями PostgreSQL до 9.5. Идея состоит в том, чтобы просто попытаться сначала выполнить вставку, и, если запись уже существует, обновить ее:
do $$
begin
insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
update testtable set somedata = 'Joe' where id = 2;
end $$;
Обратите внимание, что это решение может быть применено, только если нет удаления строк таблицы.
Я не знаю об эффективности этого решения, но оно кажется мне достаточно разумным.
MERGE в PostgreSQL версии 15
Начиная с PostgreSQL v. 15, можно использоватьMERGE
команда. На самом деле это было представлено как первое из основных улучшений этой новой версии.
Он используетWHEN MATCHED
/WHEN NOT MATCHED
условное, чтобы выбрать поведение при наличии существующей строки с теми же критериями.
Это даже лучше стандартногоUPSERT
, так как новая функция дает полный контроль надINSERT
,UPDATE
илиDELETE
ряды оптом.
MERGE INTO customer_account ca
USING recent_transactions t
ON t.customer_id = ca.customer_id
WHEN MATCHED THEN
UPDATE SET balance = balance + transaction_value
WHEN NOT MATCHED THEN
INSERT (customer_id, balance)
VALUES (t.customer_id, t.transaction_value)
Поддержка SQLAlchemy для Postgres >=9.5
Поскольку большой пост, описанный выше, охватывает множество различных подходов SQL для версий Postgres (не только не-9.5, как в вопросе), я хотел бы добавить, как это сделать в SQLAlchemy, если вы используете Postgres 9.5. Вместо того, чтобы реализовывать свой собственный эффект, вы также можете использовать функции SQLAlchemy (которые были добавлены в SQLAlchemy 1.1). Лично я бы порекомендовал использовать их, если это возможно. Не только из-за удобства, но и потому, что он позволяет PostgreSQL обрабатывать любые условия гонки, которые могут возникнуть.
Перекрестная публикация из другого ответа, который я дал вчера ( /questions/24015887/kak-sdelat-upsert-s-sqlalchemy/24015897#24015897)
SQLAlchemy поддерживает ON CONFLICT
теперь с двумя методами on_conflict_do_update()
а также on_conflict_do_nothing()
:
Копирование из документации:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS
Протестировано на Postgresql 9.3
Поскольку этот вопрос был закрыт, я публикую здесь, как вы делаете это с помощью SQLAlchemy. Через рекурсию он повторяет массовую вставку или обновление для борьбы с условиями гонки и ошибками проверки.
Сначала импорт
import itertools as it
from functools import partial
from operator import itemgetter
from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts
Теперь пара вспомогательных функций
def chunk(content, chunksize=None):
"""Groups data into chunks each with (at most) `chunksize` items.
https://stackru.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])
return it.takewhile(bool, generator)
def gen_resources(records):
"""Yields a dictionary if the record's id already exists, a row object
otherwise.
"""
ids = {item[0] for item in session.query(Posts.id)}
for record in records:
is_row = hasattr(record, 'to_dict')
if is_row and record.id in ids:
# It's a row but the id already exists, so we need to convert it
# to a dict that updates the existing record. Since it is duplicate,
# also yield True
yield record.to_dict(), True
elif is_row:
# It's a row and the id doesn't exist, so no conversion needed.
# Since it's not a duplicate, also yield False
yield record, False
elif record['id'] in ids:
# It's a dict and the id already exists, so no conversion needed.
# Since it is duplicate, also yield True
yield record, True
else:
# It's a dict and the id doesn't exist, so we need to convert it.
# Since it's not a duplicate, also yield False
yield Posts(**record), False
И, наконец, функция upsert
def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))
for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]
if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all
try:
_upsert(items)
session.commit()
except IntegrityError:
# A record was added or deleted after we checked, so retry
#
# modify accordingly by adding additional exceptions, e.g.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Some other error occurred so reduce chunksize to isolate the
# offending row(s)
db.session.rollback()
num_items = len(items)
if num_items > 1:
upsert(items, num_items // 2)
else:
print('Error adding record {}'.format(items[0]))
Вот как ты это используешь
>>> data = [
... {'id': 1, 'text': 'updated post1'},
... {'id': 5, 'text': 'updated post5'},
... {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)
Преимущество это имеет над bulk_save_objects
является то, что он может обрабатывать отношения, проверку ошибок и т. д. при вставке (в отличие от массовых операций).