Трубы и фильтры на уровне СУБД: разделение выходного потока MERGE

сценарий

У нас есть довольно стандартный процесс импорта данных, в котором мы загружаемstaging стол, то MERGE это в target Таблица.

Новые требования (зеленые) включают сбор подмножества импортированных данных в отдельный queue стол для совершенно не связанной обработки.

Схема сценария

Соревнование"

(1) Подмножество состоит из выбора записей: те, которые были недавно вставлены в target только стол.

(2) Подмножество является проекцией некоторых из вставленных столбцов, но также по крайней мере одного столбца, который присутствует только в источнике (stagingТаблица).

(3) MERGE заявление уже использует OUTPUT..INTO пункт строго для записи $actionвзяты MERGE, то что мы можемPIVOT результат и COUNT количество вставок, обновлений и удалений для статистических целей. Нам не очень нравится буферизовать действия для всего набора данных, и мы бы предпочли собирать суммы на лету. Излишне говорить, что мы не хотим добавлять больше данных к этому OUTPUT Таблица.

(4) Мы не хотим делать соответствующую работу MERGEвыполняет во второй раз по любой причине, даже частично.target таблица действительно большая, мы не можем индексировать все, и операция обычно довольно дорогая (минуты, а не секунды).

(5) Мы не рассматриваем возможность обхода любого вывода из MERGE клиенту только для того, чтобы клиент мог направить его к queue отправив его обратно немедленно. Данные должны оставаться на сервере.

(6) Мы хотим избежать буферизации всего набора данных во временном хранилище между staging и queue,

Что было бы лучшим способом сделать это?

Отказы

(а) Требование ставить в очередь только вставленные записи не позволяет нам queue стол прямо в OUTPUT..INTO пункт о MERGEтак как это не позволяет WHERE пункт. Мы можем использовать некоторыеCASE хитрость, чтобы отметить нежелательные записи для последующего удаления из queue без обработки, но это кажется сумасшедшим.

(б) Поскольку некоторые столбцы предназначены для queue не появляются вtarget таблицу, мы не можем просто добавить триггер вставки в целевой таблице для загрузки queue, "Разделение потока данных" должно произойти раньше.

(с) Поскольку мы уже используем OUTPUT..INTO пункт в MERGEмы не можем добавить секунду OUTPUT оговорка и гнездо MERGE вINSERT..SELECT загрузить очередь либо. Это позор, потому что это выглядит как произвольное ограничение для чего-то, что в противном случае работает очень хорошо; SELECT фильтрует только записи с$action мы хотим (INSERT) а также INSERTс ними в queue в одном заявлении. Таким образом, СУБД теоретически может избежать буферизации всего набора данных и просто направить его в queue, (Примечание: мы не преследовали и, скорее всего, это не оптимизировало план таким образом.)

ситуация

Мы чувствуем, что исчерпали наши возможности, но решили обратиться к Улей. Все, что мы можем придумать, это:

(S1) Создать VIEW из target таблица, которая также содержит обнуляемые столбцы для данных, предназначенных для queue только и иметьSELECT Заявление определить их как NULL, Затем настройте INSTEAD OFтриггеры, которые заполняют как target стол и queueсоответственно. Наконец, подключите MERGE нацеливаться на вид. Это работает, но мы не поклонники конструкции - это определенновыглядит сложно.

(S2) Откажитесь, буферизируйте весь набор данных во временной таблице, используя другую MERGE..OUTPUT, После MERGE, немедленно скопируйте данные (снова!) из временной таблицы в queue,

6 ответов

Насколько я понимаю, главным препятствием является ограничение OUTPUT предложение в SQL Server. Это позволяет одному OUTPUT INTO table и / или один OUTPUT который возвращает результирующий набор вызывающей стороне.

Вы хотите сохранить результат MERGE Заявление двумя разными способами:

  • все строки, которые были затронуты MERGE для сбора статистики
  • только вставленные строки для queue

Простой вариант

Я бы использовал ваше решение S2. По крайней мере, для начала. Его легко понять и поддерживать, и он должен быть достаточно эффективным, поскольку наиболее ресурсоемкая операция (MERGE в Target Сам будет исполняться только один раз). Ниже представлен второй вариант, и было бы интересно сравнить их производительность с реальными данными.

Так:

  • использование OUTPUT INTO @TempTable в MERGE
  • Или INSERT все строки из @TempTable в Stats или агрегировать перед вставкой. Если все, что вам нужно, это агрегированная статистика, имеет смысл объединить результаты этого пакета и объединить его в окончательный вариант. Stats вместо копирования всех строк.
  • INSERT в Queue только "вставленные" строки из @TempTable,

Я возьму примерные данные из ответа @i-one.

схема

-- I'll return to commented lines later

CREATE TABLE [dbo].[TestTarget](
    -- [ID] [int] IDENTITY(1,1) NOT NULL,
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStaging](
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStats](
    [MergeAction] [nvarchar](10) NOT NULL
);

CREATE TABLE [dbo].[TestQueue](
    -- [TargetID] [int] NOT NULL,
    [foo] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

Пример данных

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

сливаться

DECLARE @TempTable TABLE (
    MergeAction nvarchar(10) NOT NULL,
    foo varchar(10) NULL,
    baz varchar(10) NULL);

MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
    Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;

INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;

INSERT INTO [dbo].[TestQueue]
    ([foo]
    ,[baz])
SELECT
    T.foo
    ,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

Результат

TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A   | AA  |
| B   | BB  |
| C   | CC  |
+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C   | CCC |
+-----+-----+

Второй вариант

Протестировано на SQL Server 2014 Express.

OUTPUT Предложение может отправить свой набор результатов в таблицу и вызывающей стороне. Так, OUTPUT INTO может пойти в Stats прямо и если мы обернем MERGE оператор в хранимую процедуру, то мы можем использовать INSERT ... EXEC в Queue,

Если вы изучите план выполнения, вы увидите, что INSERT ... EXEC в любом случае создает временную таблицу за кулисами (см. также "Скрытые затраты на INSERT EXEC " Адама Маханича), поэтому я ожидаю, что общая производительность будет аналогична первому варианту при явном создании временной таблицы.

Еще одна проблема, которую нужно решить: Queue Таблица должна иметь только "вставленные" строки, а не все созданные строки. Чтобы добиться этого, вы можете использовать триггер на Queue таблица для удаления строк, отличных от "вставлен". Еще одна возможность состоит в том, чтобы определить уникальный индекс с IGNORE_DUP_KEY = ON и подготовить данные таким образом, чтобы "не вставленные" строки нарушали уникальный индекс и не вставлялись в таблицу.

Итак, я добавлю ID IDENTITY столбец к Target стол, и я добавлю TargetID столбец к Queue Таблица. (Раскомментируйте их в сценарии выше). Кроме того, я добавлю индекс к Queue Таблица:

CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
    [TargetID] ASC
) WITH (
PAD_INDEX = OFF, 
STATISTICS_NORECOMPUTE = OFF, 
SORT_IN_TEMPDB = OFF, 
IGNORE_DUP_KEY = ON, 
DROP_EXISTING = OFF, 
ONLINE = OFF, 
ALLOW_ROW_LOCKS = ON, 
ALLOW_PAGE_LOCKS = ON)

Важная часть UNIQUE а также IGNORE_DUP_KEY = ON,

Вот хранимая процедура для MERGE:

CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    MERGE INTO dbo.TestTarget AS Dst
    USING dbo.TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
        Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action INTO dbo.TestStats(MergeAction)
    OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID, 
    inserted.foo,
    Src.baz
    ;

END

использование

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
    ([TargetID]
    ,[foo]
    ,[baz])
VALUES
    (0
    ,NULL
    ,NULL);

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

Результат

TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
|  1 | A   | AA  |
|  2 | B   | BB  |
|  3 | C   | CC  |
+----+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+----------+------+------+
| TargetID | foo  | baz  |
+----------+------+------+
|        0 | NULL | NULL |
|        3 | C    | CCC  |
+----------+------+------+

Там будет дополнительное сообщение во время INSERT ... EXEC:

Duplicate key was ignored.

если MERGE обновил несколько строк. Это предупреждающее сообщение отправляется, когда уникальный индекс отбрасывает некоторые строки во время INSERT из-за IGNORE_DUP_KEY = ON,

Предупреждающее сообщение появится, когда дублированные значения ключа будут вставлены в уникальный индекс. Только строки, нарушающие ограничение уникальности, потерпят неудачу.

Рассмотрим следующие два подхода для решения проблемы:

  • Объединяйте данные в цель и выводите их в очередь одним оператором и суммируйте статистику в триггере, созданном для цели. Идентификатор партии может быть передан в триггер через временную таблицу.
  • Объединяйте данные в целевой объект и выводите их в очередь одним оператором и суммируйте статистику сразу после объединения, используя встроенные возможности отслеживания изменений, вместо того, чтобы делать это в триггере.

Подход 1 (объединить данные и собрать статистику в триггере):

Пример настройки данных (индексы и ограничения опущены для простоты):

create table staging (foo varchar(10), bar varchar(10), baz varchar(10));
create table target (foo varchar(10), bar varchar(10));
create table queue (foo varchar(10), baz varchar(10));
create table stats (batchID int, inserted bigint, updated bigint, deleted bigint);

insert into staging values
    ('A', 'AA', 'AAA')
    ,('B', 'BB', 'BBB')
    ,('C', 'CC', 'CCC')
    ;

insert into target values
    ('A', 'A_')
    ,('B', 'B?')
    ,('E', 'EE')
    ;

Триггер для сбора вставленной / обновленной / удаленной статистики:

create trigger target_onChange
on target
after delete, update, insert
as
begin
    set nocount on;

    if object_id('tempdb..#targetMergeBatch') is NULL
        return;

    declare @batchID int;
    select @batchID = batchID from #targetMergeBatch;

    merge into stats t
    using (
        select
            batchID = @batchID,
            cntIns = count_big(case when i.foo is not NULL and d.foo is NULL then 1 end),
            cntUpd = count_big(case when i.foo is not NULL and d.foo is not NULL then 1 end),
            cntDel = count_big(case when i.foo is NULL and d.foo is not NULL then 1 end)
        from inserted i
            full join deleted d on d.foo = i.foo
    ) s
    on t.batchID = s.batchID
    when matched then
        update
        set
            t.inserted = t.inserted + s.cntIns,
            t.updated = t.updated + s.cntUpd,
            t.deleted = t.deleted + s.cntDel
    when not matched then
        insert (batchID, inserted, updated, deleted)
        values (s.batchID, s.cntIns, s.cntUpd, cntDel);

end

Слияние заявлений:

declare @batchID int;
set @batchID = 1;-- or select @batchID = batchID from ...;

create table #targetMergeBatch (batchID int);
insert into #targetMergeBatch (batchID) values (@batchID);

insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

drop table #targetMergeBatch

Проверьте результаты:

select * from target;
select * from queue;
select * from stats;

Цель:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

Очередь:

foo        baz
---------- ----------
C          CCC

Статистика:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1

Подход 2 (собрать статистику, используя возможности отслеживания изменений):

Пример настройки данных такой же, как и в предыдущем случае (просто отбросьте все, включая триггер и заново создайте таблицы с нуля), за исключением того, что в этом случае нам нужно иметь PK на цели, чтобы образец работал:

create table target (foo varchar(10) primary key, bar varchar(10));

Включить отслеживание изменений в базе данных:

alter database Test
    set change_tracking = on

Включить отслеживание изменений на целевой таблице:

alter table target
    enable change_tracking

Слияние данных и получение статистики сразу после этого, фильтрация по контексту изменения для подсчета только строк, затронутых слиянием:

begin transaction;
declare @batchID int, @chVersion bigint, @chContext varbinary(128);
set @batchID = 1;-- or select @batchID = batchID from ...;
SET @chVersion = change_tracking_current_version();
set @chContext = newid();

with change_tracking_context(@chContext)
insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

with ch(foo, op) as (
    select foo, sys_change_operation
    from changetable(changes target, @chVersion) ct
    where sys_change_context = @chContext
)
insert into stats (batchID, inserted, updated, deleted)
select @batchID, [I], [U], [D]
from ch
    pivot(count_big(foo) for op in ([I], [U], [D])) pvt
    ;

commit transaction;

Проверьте результаты:

select * from target;
select * from queue;
select * from stats;

Они такие же, как в предыдущем примере.

Цель:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

Очередь:

foo        baz
---------- ----------
C          CCC

Статистика:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1

Я предлагаю извлекать статистику кодирования с использованием трех независимых AFTER INSERT / DELETE / UPDATE срабатывает по линии:

create trigger dbo.insert_trigger_target
on [dbo].[target]
after insert
as
insert into dbo.[stats] ([action],[count])
select 'insert', count(1)
from inserted;
go

create trigger dbo.update_trigger_target
on [dbo].[target]
after update
as
insert into dbo.[stats] ([action],[count])
select 'update', count(1) from inserted -- or deleted == after / before image, count will be the same
go

create trigger dbo.delete_trigger_target
on [dbo].[target]
after delete
as
insert into dbo.[stats] ([action],[count])
select 'delete', count(1) from deleted
go

Если вам нужно больше контекста, поместите что-нибудь в CONTEXT_INFO и вырвать его из триггеров.

Теперь я собираюсь утверждать, что триггеры AFTER не такие дорогие, но вам нужно проверить это, чтобы быть уверенным.

Разобравшись с этим, вы сможете свободно использовать OUTPUT пункт (НЕ OUTPUT INTO) в MERGE а затем использовать этот вложенный внутри выбора для подмножества данных, которые вы хотите перейти в queue Таблица.

обоснование

Из-за необходимости доступа к столбцам из обоих staging а также target для того, чтобы построить данные для queue, это должно быть сделано с помощью OUTPUT вариант в MERGE, поскольку ничто другое не имеет доступа к "обеим сторонам".

Тогда, если мы угнали OUTPUT пункт для queue Как мы можем переработать эту функциональность? я думаю AFTER триггеры будут работать, учитывая требования к статистике, которые вы описали. Действительно, статистика может быть довольно сложной, если требуется, учитывая доступные изображения. Я утверждаю, что AFTER триггеры "не такие дорогие", так как данные как до, так и после всегда должны быть доступны, чтобы транзакция могла быть как ЗАВЕРШЕНА, так и ОБРАТНО ЗАКЛЮЧЕНА - да, данные должны быть отсканированы (даже для подсчета), но это не не кажется слишком большой ценой.

В моем собственном анализе это сканирование добавило около 5% к базовой стоимости плана выполнения

Звучит как решение?

Рассматривали ли вы отказ от слияния и просто сделать вставку, где не существует, и обновление? Затем вы можете использовать предложение output из вставки для заполнения таблицы очередей.

Импорт через промежуточную таблицу может быть более эффективным при последовательной, а не ориентированной на наборы обработке. Я бы подумал переписать MERGE в хранимую процедуру со сканированием курсора. Тогда для каждой записи вы можете иметь столько выходных данных, сколько вам нужно, плюс любые отсчеты без опоры, общей стоимостью один staging сканирование таблицы.

Хранимая процедура может также предоставить возможность разделить обработку на более мелкие транзакции, тогда как триггеры на больших наборах данных могут привести к переполнению журнала транзакций.

Если я что-то не упустил, простая команда вставки должна соответствовать всем вашим требованиям.

insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging join target on staging.foo = target.boo
where whatever

Это произойдет после слияния с целью.

Только для новых записей, сделайте это до слияния

insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging left join target on staging.foo = target.boo
where target.foo = null
Другие вопросы по тегам