Трубы и фильтры на уровне СУБД: разделение выходного потока MERGE
сценарий
У нас есть довольно стандартный процесс импорта данных, в котором мы загружаемstaging
стол, то MERGE
это в target
Таблица.
Новые требования (зеленые) включают сбор подмножества импортированных данных в отдельный queue
стол для совершенно не связанной обработки.
Соревнование"
(1) Подмножество состоит из выбора записей: те, которые были недавно вставлены в target
только стол.
(2) Подмножество является проекцией некоторых из вставленных столбцов, но также по крайней мере одного столбца, который присутствует только в источнике (staging
Таблица).
(3) MERGE
заявление уже использует OUTPUT..INTO
пункт строго для записи $action
взяты MERGE
, то что мы можемPIVOT
результат и COUNT
количество вставок, обновлений и удалений для статистических целей. Нам не очень нравится буферизовать действия для всего набора данных, и мы бы предпочли собирать суммы на лету. Излишне говорить, что мы не хотим добавлять больше данных к этому OUTPUT
Таблица.
(4) Мы не хотим делать соответствующую работу MERGE
выполняет во второй раз по любой причине, даже частично.target
таблица действительно большая, мы не можем индексировать все, и операция обычно довольно дорогая (минуты, а не секунды).
(5) Мы не рассматриваем возможность обхода любого вывода из MERGE
клиенту только для того, чтобы клиент мог направить его к queue
отправив его обратно немедленно. Данные должны оставаться на сервере.
(6) Мы хотим избежать буферизации всего набора данных во временном хранилище между staging
и queue
,
Что было бы лучшим способом сделать это?
Отказы
(а) Требование ставить в очередь только вставленные записи не позволяет нам queue
стол прямо в OUTPUT..INTO
пункт о MERGE
так как это не позволяет WHERE
пункт. Мы можем использовать некоторыеCASE
хитрость, чтобы отметить нежелательные записи для последующего удаления из queue
без обработки, но это кажется сумасшедшим.
(б) Поскольку некоторые столбцы предназначены для queue
не появляются вtarget
таблицу, мы не можем просто добавить триггер вставки в целевой таблице для загрузки queue
, "Разделение потока данных" должно произойти раньше.
(с) Поскольку мы уже используем OUTPUT..INTO
пункт в MERGE
мы не можем добавить секунду OUTPUT
оговорка и гнездо MERGE
вINSERT..SELECT
загрузить очередь либо. Это позор, потому что это выглядит как произвольное ограничение для чего-то, что в противном случае работает очень хорошо; SELECT
фильтрует только записи с$action
мы хотим (INSERT
) а также INSERT
с ними в queue
в одном заявлении. Таким образом, СУБД теоретически может избежать буферизации всего набора данных и просто направить его в queue
, (Примечание: мы не преследовали и, скорее всего, это не оптимизировало план таким образом.)
ситуация
Мы чувствуем, что исчерпали наши возможности, но решили обратиться к Улей. Все, что мы можем придумать, это:
(S1) Создать VIEW
из target
таблица, которая также содержит обнуляемые столбцы для данных, предназначенных для queue
только и иметьSELECT
Заявление определить их как NULL
, Затем настройте INSTEAD OF
триггеры, которые заполняют как target
стол и queue
соответственно. Наконец, подключите MERGE
нацеливаться на вид. Это работает, но мы не поклонники конструкции - это определенновыглядит сложно.
(S2) Откажитесь, буферизируйте весь набор данных во временной таблице, используя другую MERGE..OUTPUT
, После MERGE
, немедленно скопируйте данные (снова!) из временной таблицы в queue
,
6 ответов
Насколько я понимаю, главным препятствием является ограничение OUTPUT
предложение в SQL Server. Это позволяет одному OUTPUT INTO table
и / или один OUTPUT
который возвращает результирующий набор вызывающей стороне.
Вы хотите сохранить результат MERGE
Заявление двумя разными способами:
- все строки, которые были затронуты
MERGE
для сбора статистики - только вставленные строки для
queue
Простой вариант
Я бы использовал ваше решение S2. По крайней мере, для начала. Его легко понять и поддерживать, и он должен быть достаточно эффективным, поскольку наиболее ресурсоемкая операция (MERGE
в Target
Сам будет исполняться только один раз). Ниже представлен второй вариант, и было бы интересно сравнить их производительность с реальными данными.
Так:
- использование
OUTPUT INTO @TempTable
вMERGE
- Или
INSERT
все строки из@TempTable
вStats
или агрегировать перед вставкой. Если все, что вам нужно, это агрегированная статистика, имеет смысл объединить результаты этого пакета и объединить его в окончательный вариант.Stats
вместо копирования всех строк. INSERT
вQueue
только "вставленные" строки из@TempTable
,
Я возьму примерные данные из ответа @i-one.
схема
-- I'll return to commented lines later
CREATE TABLE [dbo].[TestTarget](
-- [ID] [int] IDENTITY(1,1) NOT NULL,
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStaging](
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStats](
[MergeAction] [nvarchar](10) NOT NULL
);
CREATE TABLE [dbo].[TestQueue](
-- [TargetID] [int] NOT NULL,
[foo] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
Пример данных
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
сливаться
DECLARE @TempTable TABLE (
MergeAction nvarchar(10) NOT NULL,
foo varchar(10) NULL,
baz varchar(10) NULL);
MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;
INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;
INSERT INTO [dbo].[TestQueue]
([foo]
,[baz])
SELECT
T.foo
,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
Результат
TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A | AA |
| B | BB |
| C | CC |
+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C | CCC |
+-----+-----+
Второй вариант
Протестировано на SQL Server 2014 Express.
OUTPUT
Предложение может отправить свой набор результатов в таблицу и вызывающей стороне. Так, OUTPUT INTO
может пойти в Stats
прямо и если мы обернем MERGE
оператор в хранимую процедуру, то мы можем использовать INSERT ... EXEC
в Queue
,
Если вы изучите план выполнения, вы увидите, что INSERT ... EXEC
в любом случае создает временную таблицу за кулисами (см. также "Скрытые затраты на INSERT EXEC " Адама Маханича), поэтому я ожидаю, что общая производительность будет аналогична первому варианту при явном создании временной таблицы.
Еще одна проблема, которую нужно решить: Queue
Таблица должна иметь только "вставленные" строки, а не все созданные строки. Чтобы добиться этого, вы можете использовать триггер на Queue
таблица для удаления строк, отличных от "вставлен". Еще одна возможность состоит в том, чтобы определить уникальный индекс с IGNORE_DUP_KEY = ON
и подготовить данные таким образом, чтобы "не вставленные" строки нарушали уникальный индекс и не вставлялись в таблицу.
Итак, я добавлю ID IDENTITY
столбец к Target
стол, и я добавлю TargetID
столбец к Queue
Таблица. (Раскомментируйте их в сценарии выше). Кроме того, я добавлю индекс к Queue
Таблица:
CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
[TargetID] ASC
) WITH (
PAD_INDEX = OFF,
STATISTICS_NORECOMPUTE = OFF,
SORT_IN_TEMPDB = OFF,
IGNORE_DUP_KEY = ON,
DROP_EXISTING = OFF,
ONLINE = OFF,
ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON)
Важная часть UNIQUE
а также IGNORE_DUP_KEY = ON
,
Вот хранимая процедура для MERGE
:
CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
MERGE INTO dbo.TestTarget AS Dst
USING dbo.TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action INTO dbo.TestStats(MergeAction)
OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID,
inserted.foo,
Src.baz
;
END
использование
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
([TargetID]
,[foo]
,[baz])
VALUES
(0
,NULL
,NULL);
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
Результат
TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
| 1 | A | AA |
| 2 | B | BB |
| 3 | C | CC |
+----+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+----------+------+------+
| TargetID | foo | baz |
+----------+------+------+
| 0 | NULL | NULL |
| 3 | C | CCC |
+----------+------+------+
Там будет дополнительное сообщение во время INSERT ... EXEC
:
Duplicate key was ignored.
если MERGE
обновил несколько строк. Это предупреждающее сообщение отправляется, когда уникальный индекс отбрасывает некоторые строки во время INSERT
из-за IGNORE_DUP_KEY = ON
,
Предупреждающее сообщение появится, когда дублированные значения ключа будут вставлены в уникальный индекс. Только строки, нарушающие ограничение уникальности, потерпят неудачу.
Рассмотрим следующие два подхода для решения проблемы:
- Объединяйте данные в цель и выводите их в очередь одним оператором и суммируйте статистику в триггере, созданном для цели. Идентификатор партии может быть передан в триггер через временную таблицу.
- Объединяйте данные в целевой объект и выводите их в очередь одним оператором и суммируйте статистику сразу после объединения, используя встроенные возможности отслеживания изменений, вместо того, чтобы делать это в триггере.
Подход 1 (объединить данные и собрать статистику в триггере):
Пример настройки данных (индексы и ограничения опущены для простоты):
create table staging (foo varchar(10), bar varchar(10), baz varchar(10));
create table target (foo varchar(10), bar varchar(10));
create table queue (foo varchar(10), baz varchar(10));
create table stats (batchID int, inserted bigint, updated bigint, deleted bigint);
insert into staging values
('A', 'AA', 'AAA')
,('B', 'BB', 'BBB')
,('C', 'CC', 'CCC')
;
insert into target values
('A', 'A_')
,('B', 'B?')
,('E', 'EE')
;
Триггер для сбора вставленной / обновленной / удаленной статистики:
create trigger target_onChange
on target
after delete, update, insert
as
begin
set nocount on;
if object_id('tempdb..#targetMergeBatch') is NULL
return;
declare @batchID int;
select @batchID = batchID from #targetMergeBatch;
merge into stats t
using (
select
batchID = @batchID,
cntIns = count_big(case when i.foo is not NULL and d.foo is NULL then 1 end),
cntUpd = count_big(case when i.foo is not NULL and d.foo is not NULL then 1 end),
cntDel = count_big(case when i.foo is NULL and d.foo is not NULL then 1 end)
from inserted i
full join deleted d on d.foo = i.foo
) s
on t.batchID = s.batchID
when matched then
update
set
t.inserted = t.inserted + s.cntIns,
t.updated = t.updated + s.cntUpd,
t.deleted = t.deleted + s.cntDel
when not matched then
insert (batchID, inserted, updated, deleted)
values (s.batchID, s.cntIns, s.cntUpd, cntDel);
end
Слияние заявлений:
declare @batchID int;
set @batchID = 1;-- or select @batchID = batchID from ...;
create table #targetMergeBatch (batchID int);
insert into #targetMergeBatch (batchID) values (@batchID);
insert into queue (foo, baz)
select foo, baz
from
(
merge into target t
using staging s
on t.foo = s.foo
when matched then
update
set t.bar = s.bar
when not matched then
insert (foo, bar)
values (s.foo, s.bar)
when not matched by source then
delete
output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
;
drop table #targetMergeBatch
Проверьте результаты:
select * from target;
select * from queue;
select * from stats;
Цель:
foo bar
---------- ----------
A AA
B BB
C CC
Очередь:
foo baz
---------- ----------
C CCC
Статистика:
batchID inserted updated deleted
-------- ---------- --------- ---------
1 1 2 1
Подход 2 (собрать статистику, используя возможности отслеживания изменений):
Пример настройки данных такой же, как и в предыдущем случае (просто отбросьте все, включая триггер и заново создайте таблицы с нуля), за исключением того, что в этом случае нам нужно иметь PK на цели, чтобы образец работал:
create table target (foo varchar(10) primary key, bar varchar(10));
Включить отслеживание изменений в базе данных:
alter database Test
set change_tracking = on
Включить отслеживание изменений на целевой таблице:
alter table target
enable change_tracking
Слияние данных и получение статистики сразу после этого, фильтрация по контексту изменения для подсчета только строк, затронутых слиянием:
begin transaction;
declare @batchID int, @chVersion bigint, @chContext varbinary(128);
set @batchID = 1;-- or select @batchID = batchID from ...;
SET @chVersion = change_tracking_current_version();
set @chContext = newid();
with change_tracking_context(@chContext)
insert into queue (foo, baz)
select foo, baz
from
(
merge into target t
using staging s
on t.foo = s.foo
when matched then
update
set t.bar = s.bar
when not matched then
insert (foo, bar)
values (s.foo, s.bar)
when not matched by source then
delete
output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
;
with ch(foo, op) as (
select foo, sys_change_operation
from changetable(changes target, @chVersion) ct
where sys_change_context = @chContext
)
insert into stats (batchID, inserted, updated, deleted)
select @batchID, [I], [U], [D]
from ch
pivot(count_big(foo) for op in ([I], [U], [D])) pvt
;
commit transaction;
Проверьте результаты:
select * from target;
select * from queue;
select * from stats;
Они такие же, как в предыдущем примере.
Цель:
foo bar
---------- ----------
A AA
B BB
C CC
Очередь:
foo baz
---------- ----------
C CCC
Статистика:
batchID inserted updated deleted
-------- ---------- --------- ---------
1 1 2 1
Я предлагаю извлекать статистику кодирования с использованием трех независимых AFTER INSERT / DELETE / UPDATE
срабатывает по линии:
create trigger dbo.insert_trigger_target
on [dbo].[target]
after insert
as
insert into dbo.[stats] ([action],[count])
select 'insert', count(1)
from inserted;
go
create trigger dbo.update_trigger_target
on [dbo].[target]
after update
as
insert into dbo.[stats] ([action],[count])
select 'update', count(1) from inserted -- or deleted == after / before image, count will be the same
go
create trigger dbo.delete_trigger_target
on [dbo].[target]
after delete
as
insert into dbo.[stats] ([action],[count])
select 'delete', count(1) from deleted
go
Если вам нужно больше контекста, поместите что-нибудь в CONTEXT_INFO
и вырвать его из триггеров.
Теперь я собираюсь утверждать, что триггеры AFTER не такие дорогие, но вам нужно проверить это, чтобы быть уверенным.
Разобравшись с этим, вы сможете свободно использовать OUTPUT
пункт (НЕ OUTPUT INTO
) в MERGE
а затем использовать этот вложенный внутри выбора для подмножества данных, которые вы хотите перейти в queue
Таблица.
обоснование
Из-за необходимости доступа к столбцам из обоих staging
а также target
для того, чтобы построить данные для queue
, это должно быть сделано с помощью OUTPUT
вариант в MERGE
, поскольку ничто другое не имеет доступа к "обеим сторонам".
Тогда, если мы угнали OUTPUT
пункт для queue
Как мы можем переработать эту функциональность? я думаю AFTER
триггеры будут работать, учитывая требования к статистике, которые вы описали. Действительно, статистика может быть довольно сложной, если требуется, учитывая доступные изображения. Я утверждаю, что AFTER
триггеры "не такие дорогие", так как данные как до, так и после всегда должны быть доступны, чтобы транзакция могла быть как ЗАВЕРШЕНА, так и ОБРАТНО ЗАКЛЮЧЕНА - да, данные должны быть отсканированы (даже для подсчета), но это не не кажется слишком большой ценой.
В моем собственном анализе это сканирование добавило около 5% к базовой стоимости плана выполнения
Звучит как решение?
Рассматривали ли вы отказ от слияния и просто сделать вставку, где не существует, и обновление? Затем вы можете использовать предложение output из вставки для заполнения таблицы очередей.
Импорт через промежуточную таблицу может быть более эффективным при последовательной, а не ориентированной на наборы обработке. Я бы подумал переписать MERGE
в хранимую процедуру со сканированием курсора. Тогда для каждой записи вы можете иметь столько выходных данных, сколько вам нужно, плюс любые отсчеты без опоры, общей стоимостью один staging
сканирование таблицы.
Хранимая процедура может также предоставить возможность разделить обработку на более мелкие транзакции, тогда как триггеры на больших наборах данных могут привести к переполнению журнала транзакций.
Если я что-то не упустил, простая команда вставки должна соответствовать всем вашим требованиям.
insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging join target on staging.foo = target.boo
where whatever
Это произойдет после слияния с целью.
Только для новых записей, сделайте это до слияния
insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging left join target on staging.foo = target.boo
where target.foo = null