Как сделать UPSERT (MERGE, INSERT ... ON DUPLICATE UPDATE) в PostgreSQL?

Очень часто задаваемый вопрос здесь, как сделать upsert, что называется MySQL INSERT ... ON DUPLICATE UPDATE и стандарт поддерживает как часть MERGE операция.

Учитывая, что PostgreSQL не поддерживает его напрямую (до pg 9.5), как вы это делаете? Учтите следующее:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

А теперь представьте, что вы хотите "перевернуть" кортежи (2, 'Joe'), (3, 'Alan')поэтому новое содержимое таблицы будет:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

Вот о чем говорят люди, когда обсуждают upsert, Важно отметить, что любой подход должен быть безопасным при наличии нескольких транзакций, работающих на одной и той же таблице - либо с использованием явной блокировки, либо иным образом защищаясь от возникающих условий гонки.

Эта тема широко обсуждается на Вставке, на дубликате обновления в PostgreSQL?, но это об альтернативах синтаксису MySQL, и с течением времени он вырос довольно много не связанных деталей. Я работаю над окончательными ответами.

Эти методы также полезны для "вставить, если не существует, в противном случае ничего не делать", то есть "вставить... при игнорировании дубликата ключа".

7 ответов

Решение

9.5 и новее:

Поддержка PostgreSQL 9.5 и новее INSERT ... ON CONFLICT UPDATE (а также ON CONFLICT DO NOTHING), т.е.

В сравнении сON DUPLICATE KEY UPDATE,

Быстрое объяснение.

Для использования см. Руководство - в частности, предложение конфликта_на синтаксической диаграмме и пояснительный текст.

В отличие от решений для 9.4 и старше, приведенных ниже, эта функция работает с несколькими конфликтующими строками и не требует эксклюзивной блокировки или цикла повторной попытки.

Коммит, добавляющий эту функцию, находится здесь, и обсуждение ее разработки здесь.


Если вы используете 9.5 и вам не нужна обратная совместимость, вы можете прекратить чтение сейчас.


9,4 и старше:

PostgreSQL не имеет встроенного UPSERT (или же MERGE), и делать это эффективно в условиях одновременного использования очень сложно.

Эта статья обсуждает проблему в полезных деталях.

В общем, вы должны выбрать один из двух вариантов:

  • Отдельные операции вставки / обновления в цикле повтора; или же
  • Блокировка таблицы и выполнение пакетного слияния

Индивидуальная петля повторения строки

Использование отдельных загрузчиков строк в цикле повторов является разумным вариантом, если вы хотите, чтобы несколько соединений одновременно пытались выполнить вставки.

Документация PostgreSQL содержит полезную процедуру, которая позволит вам делать это в цикле внутри базы данных. Он защищает от потерянных обновлений и вставки рас, в отличие от большинства наивных решений. Это будет работать только в READ COMMITTED режим и безопасен только в том случае, если это единственное, что вы делаете в транзакции. Функция не будет работать правильно, если триггеры или вторичные уникальные ключи вызывают уникальные нарушения.

Эта стратегия очень неэффективна. Когда это целесообразно, вы должны поставить работу в очередь и выполнить массовую загрузку, как описано ниже.

Многие попытки решить эту проблему не учитывают откатов, поэтому они приводят к неполным обновлениям. Две транзакции мчатся друг с другом; один из них успешно INSERTs; другой получает ошибку дублированного ключа и делает UPDATE вместо. UPDATE блоки в ожидании INSERT откатить или зафиксировать. Когда он откатывается, UPDATE повторная проверка условия соответствует нулю строк, поэтому даже если UPDATE фиксирует, что это на самом деле не принесло ожидаемого эффекта. Вы должны проверить количество строк результата и при необходимости повторить попытку.

Некоторые попытки решения также не учитывают гонки SELECT. Если вы попробуете очевидное и простое:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

затем, когда два запускаются одновременно, есть несколько режимов отказа. Одним из них является уже обсуждаемая проблема с перепроверкой обновлений. Другой, где оба UPDATE в то же время, сопоставляя ноль строк и продолжая. Тогда они оба делают EXISTS тест, который происходит до INSERT, Оба получают ноль строк, поэтому оба INSERT, Один не удается с ошибкой дубликата ключа.

Вот почему вам нужен повторный цикл. Вы можете подумать, что вы можете предотвратить повторяющиеся ошибки ключа или потерянные обновления с помощью умного SQL, но не можете. Вам нужно проверить количество строк или обработать дубликаты ошибок ключа (в зависимости от выбранного подхода) и повторить попытку.

Пожалуйста, не катите свое собственное решение для этого. Как и в случае с очередями сообщений, это, вероятно, неправильно.

Массовый уперт с замком

Иногда вы хотите выполнить массовое обновление, где у вас есть новый набор данных, который вы хотите объединить в более старый существующий набор данных. Это намного эффективнее, чем отдельные аппроциклы строк, и должно быть предпочтительным, когда это целесообразно.

В этом случае вы обычно выполняете следующий процесс:

  • CREATE TEMPORARY Таблица

  • COPY или массово вставьте новые данные во временную таблицу

  • LOCK целевой стол IN EXCLUSIVE MODE, Это позволяет другим транзакциям SELECT, но не вносите никаких изменений в таблицу.

  • Сделать UPDATE ... FROM существующих записей, используя значения во временной таблице;

  • Сделать INSERT из строк, которые еще не существуют в целевой таблице;

  • COMMIT, открыв замок.

Например, для примера, приведенного в вопросе, используется многозначное INSERT заполнить временную таблицу:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Связанное чтение

Как насчет MERGE?

SQL-стандарт MERGE на самом деле имеет плохо определенную семантику параллелизма и не подходит для размещения без предварительной блокировки таблицы.

Это действительно полезный оператор OLAP для объединения данных, но на самом деле он не является полезным решением для безопасного параллелизма. Есть много советов людям, использующим другие СУБД для использования MERGE для upsts, но это на самом деле неправильно.

Другие БД:

Вот несколько примеров для insert ... on conflict ... (стр. 9,5+):

  • Вставь, на конфликт - ничего не делай.
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict do nothing;

  • Вставить, при конфликте - выполнить обновление, указать цель конфликта через столбец.
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict(id) do update set name = 'new_name', size = 3;

  • Вставить, при конфликте - выполнить обновление, указать цель конфликта через имя ограничения.
    insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;

Я пытаюсь внести свой вклад в другое решение для одной проблемы вставки с версиями PostgreSQL до 9.5. Идея состоит в том, чтобы просто попытаться сначала выполнить вставку, и, если запись уже существует, обновить ее:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Обратите внимание, что это решение может быть применено, только если нет удаления строк таблицы.

Я не знаю об эффективности этого решения, но оно кажется мне достаточно разумным.

MERGE в PostgreSQL версии 15

Начиная с PostgreSQL v. 15, можно использоватьMERGEкоманда. На самом деле это было представлено как первое из основных улучшений этой новой версии.

Он используетWHEN MATCHED/WHEN NOT MATCHEDусловное, чтобы выбрать поведение при наличии существующей строки с теми же критериями.

Это даже лучше стандартногоUPSERT, так как новая функция дает полный контроль надINSERT,UPDATEилиDELETEряды оптом.

      MERGE INTO customer_account ca
USING recent_transactions t
ON t.customer_id = ca.customer_id
WHEN MATCHED THEN
  UPDATE SET balance = balance + transaction_value
WHEN NOT MATCHED THEN
  INSERT (customer_id, balance)
  VALUES (t.customer_id, t.transaction_value)

Поддержка SQLAlchemy для Postgres >=9.5

Поскольку большой пост, описанный выше, охватывает множество различных подходов SQL для версий Postgres (не только не-9.5, как в вопросе), я хотел бы добавить, как это сделать в SQLAlchemy, если вы используете Postgres 9.5. Вместо того, чтобы реализовывать свой собственный эффект, вы также можете использовать функции SQLAlchemy (которые были добавлены в SQLAlchemy 1.1). Лично я бы порекомендовал использовать их, если это возможно. Не только из-за удобства, но и потому, что он позволяет PostgreSQL обрабатывать любые условия гонки, которые могут возникнуть.

Перекрестная публикация из другого ответа, который я дал вчера ( /questions/24015887/kak-sdelat-upsert-s-sqlalchemy/24015897#24015897)

SQLAlchemy поддерживает ON CONFLICT теперь с двумя методами on_conflict_do_update() а также on_conflict_do_nothing():

Копирование из документации:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict

WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Протестировано на Postgresql 9.3

Поскольку этот вопрос был закрыт, я публикую здесь, как вы делаете это с помощью SQLAlchemy. Через рекурсию он повторяет массовую вставку или обновление для борьбы с условиями гонки и ошибками проверки.

Сначала импорт

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Теперь пара вспомогательных функций

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackru.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

И, наконец, функция upsert

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

Вот как ты это используешь

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

Преимущество это имеет над bulk_save_objects является то, что он может обрабатывать отношения, проверку ошибок и т. д. при вставке (в отличие от массовых операций).

Другие вопросы по тегам