SQL-импорт файла расширенных событий с использованием sys.fn_xe_file_target_read_file, как получить значения только с момента последнего импорта

Я использую SQL Server 2012

У меня есть продолжительное расширенное событие (работает в течение нескольких дней для записи событий), которое сохраняется в файл.xel.

У меня есть задание, которое периодически запускается для импорта данных в промежуточную таблицу. Я только импортирую XML event_data столбец из файла, чтобы я мог разобрать нужные мне поля XML и сохранить в таблицу для отчетов.

Я знаю, когда в последний раз я запускал импорт, поэтому я хочу посмотреть, могу ли я выбрать только те записи из файла, которые были добавлены с момента последнего запуска процесса импорта.

Сейчас он работает, но он импортирует ВСЕ записи из файлов в промежуточные таблицы, анализирует нужные мне поля (включая метку времени), а затем импортирует только те записи, которые имеют метку времени с момента последнего выполнения задания.

Мой процесс вставляет только новые с момента последнего запуска задания, поэтому все работает нормально, но выполняет большую часть работы, импортируя и анализируя XML для ВСЕХ записей в файле, включая те, которые я уже импортировал в последний раз, когда выполнялось задание,

Поэтому я хочу найти способ вообще не импортировать из файла, если он уже был импортирован, или, по крайней мере, не нужно анализировать XML для уже импортированных записей (хотя мне нужно проанализировать его сейчас, чтобы получить метку времени для исключить уже обработанные).

Ниже приведено то, что у меня есть, и, как я уже сказал, это работает, но выполняет много дополнительной работы, если я могу найти способ пропустить те, которые я уже импортировал.

Я включил в свой процесс только те шаги, по которым мне нужна помощь:

-- pull data from file path and insert into staging table
INSERT INTO #CaptureObjectUsageFileData (event_data)
SELECT cast(event_data as XML) as event_data
FROM sys.fn_xe_file_target_read_file(@FilePathNameToImport, null, null, null)


-- parse out the data needed (only columns using) and insert into temp table for parsed data
INSERT INTO #CaptureObjectUsageEventData (EventTime, EventObjectType, EventObjectName)
SELECT n.value('(@timestamp)[1]', 'datetime') AS [utc_timestamp],
n.value('(data[@name="object_type"]/text)[1]', 'varchar(500)') AS ObjectType,
n.value('(data[@name="object_name"]/value)[1]', 'varchar(500)') as ObjectName
from (
    SELECT event_data
    FROM #CaptureObjectUsageFileData (NOLOCK)
) ed
CROSS apply ed.event_data.nodes('event') as q(n)


-- select from temp table as another step for speed/conversion
--  converting the timestamp to smalldatetime so it doesnt get miliseconds so when we select distinct it wont have lots of dupes
INSERT INTO DBALocal.dbo.DBObjectUsageTracking(DatabaseID, ObjectType, ObjectName, ObjectUsageDateTime)
SELECT DISTINCT @DBID, EventObjectType, EventObjectName, CAST(EventTime AS SMALLDATETIME)
FROM #CaptureObjectUsageEventData
WHERE EventTime > @LastRunDateTime

3 ответа

Решение

Хорошо, я уже разместил комментарий, но - подумав немного глубже и изучив ваш код - это может быть довольно просто:

Вы можете сохранить время вашего последнего импорта и использовать предикат в .nodes() (как вы делаете это в .value() чтобы получить правильный <data>-элемент).

Попробуйте что-то вроде этого:

DECLARE @LastImport DATETIME=GETDATE(); --put the last import's time here

and then

CROSS apply ed.event_data.nodes('event[@timestamp cast as xs:dateTime? > sql:variable("@LastImport")]') as q(n)

Делать это, .nodes() должен вернуться только <event>-элементы, где условие выполнено. Если это не помогает, пожалуйста, покажите несколько сокращенных примеров XML и того, что вы хотите получить.

Принял ответ выше, но разместил код для раздела, по которому у меня были вопросы, полностью с обновлениями из комментариев / исправлений, которые я сделал (опять же не весь код), но важными частями. С помощью справки @Shnugo я смог полностью удалить временную таблицу из своего процесса, которая была необходима для фильтрации даты перед вставкой в ​​постоянную таблицу, с его ответом я мог просто вставить ее непосредственно в постоянную таблицу. В моем тестировании небольших наборов данных обновление и удаление дополнительного кода сократили время выполнения на 1/3. Чем больше данных я получу, тем большее влияние окажет это улучшение.

Это предназначено для запуска сеанса расширенного события в течение длительного периода времени. Он скажет мне, какие объекты используются (для последующего запроса к системным таблицам), чтобы сказать, какие из них НЕ используются. См. Код генерации расширенных событий ниже: я собираю информацию о: sp_statement_starting и собираю только события SP и функции и сохраняю только имя объекта, тип и метку времени. Я НЕ сохраняю текст SQL, потому что он не нужен для моей цели.

Процедура sp_statement_starting извлекает каждый оператор внутри хранимой процедуры, поэтому при запуске SP он может иметь от 1 до 100 операторов, начинающих события, и вставлять столько файлов в файл (что намного больше данных, чем необходимо для моих целей).

В моем коде после импорта файла в промежуточную таблицу я сокращаю метку времени до shortdatetime и выбираю отличительные значения из всех записей в файле.

Я делаю это, потому что он вставляет запись для каждого оператора внутри SP, сокращая данные до короткого времени и выбирая отличные, значительно уменьшает количество вставленных записей.

Я знаю, что могу просто сохранить имя объекта и вставлять только уникальные значения и полностью игнорировать время, но я хочу приблизительно видеть, как часто они вызываются.

CREATE EVENT SESSION [CaptureObjectUsage_SubmissionEngine] ON SERVER 
ADD EVENT sqlserver.sp_statement_starting(
    -- collect object name but NOT statement, thats not needed
    SET collect_object_name=(1),
    collect_statement=(0)
    WHERE (
    -- this is for functions or SP's
        (
            -- functions
            [object_type]=(8272) 
            -- SProcs
            OR [object_type]=(20038)
        ) 
        AND [sqlserver].[database_name]=N'DBNAMEHERE' 
        AND [sqlserver].[is_system]=(0))
    ) 
ADD TARGET package0.event_file( 
    SET filename=N'c:\Path\CaptureObjectUsage.xel'  -- mine that was default UI gave me
)
WITH (MAX_MEMORY=4096 KB,EVENT_RETENTION_MODE=ALLOW_SINGLE_EVENT_LOSS,MAX_DISPATCH_LATENCY=30 SECONDS,MAX_EVENT_SIZE=0 KB,MEMORY_PARTITION_MODE=NONE,TRACK_CAUSALITY=OFF,STARTUP_STATE=OFF)
GO




-- ***************************************************************************
--      code for importing
-- ***************************************************************************

-- pull data from file path and insert into staging table
INSERT INTO #CaptureObjectUsageFileData (event_data)
SELECT cast(event_data as XML) as event_data
FROM sys.fn_xe_file_target_read_file(@FilePathNameToImport, null, null, null)


-- with the XML.nodes parsing I can insert directly into my final table because it does the logic here
INSERT INTO DBALocal.dbo.DBObjectUsageTracking(DatabaseID, ObjectType, ObjectName, ObjectUsageDateTime)
SELECT DISTINCT @DBID, -- @DBID is variable I set above so I dont need to use DBNAME and take up a ton more space
n.value('(data[@name="object_type"]/text)[1]', 'varchar(500)') AS ObjectType,
n.value('(data[@name="object_name"]/value)[1]', 'varchar(500)') as ObjectName,
CAST(n.value('(@timestamp)[1]', 'datetime') AS SMALLDATETIME) AS [utc_timestamp]
from (
    SELECT event_data
    FROM #CaptureObjectUsageFileData (NOLOCK)
) ed
-- original  before adding the .node logic
--CROSS apply ed.event_data.nodes('event') as q(n)
-- updated to reduce amount of data to import
CROSS apply ed.event_data.nodes('event[@timestamp cast as xs:dateTime? > sql:variable("@LastRunDateTime")]') as q(n)

старый вопрос, но так как никто не предложил решение с использованием initial_offsetпараметр для sys.fn_xe_file_target_read_file, я добавлю немного кода о том, как я использовал его несколько лет назад. Я думаю, что это не рабочее решение, потому что я вырезал и вставил его из большей базы кода, но оно показывает все, что необходимо для его работы.

      -- table to hold the config, i.e. the last file read and the offset.
IF OBJECT_ID('session_data_reader_config', 'U') IS NULL 
CREATE TABLE session_data_reader_config
(
    lock                  bit           PRIMARY KEY 
                                        DEFAULT 1 
                                        CHECK(lock=1) -- to allow only one record in the table
    , file_target_path      nvarchar(260)
    , last_file_read        nvarchar(260)
    , last_file_read_offset bigint
    , file_exists           AS dbo.fn_file_exists(last_file_read)
)


-- Insert the default value to start reading the log files, if no values are already present.
IF NOT EXISTS(SELECT 1 FROM  session_data_reader_config )
INSERT INTO session_data_reader_config (file_target_path,last_file_read,last_file_read_offset)
VALUES ('PathToYourFiles*.xel',NULL,NULL)


-- import the EE data into the staging table
IF EXISTS(SELECT 1 FROM [session_data_reader_config] WHERE file_exists = 1 )
BEGIN
    INSERT INTO [staging_table] ([file_name], [file_offset], [data])
    SELECT t2.file_name, t2.file_offset, t2.event_data --, CAST(t2.event_data as XML)
    FROM [session_data_reader_config]
    CROSS APPLY sys.fn_xe_file_target_read_file(file_target_path,NULL, last_file_read, last_file_read_offset) t2
END 
ELSE
BEGIN
    INSERT INTO [staging_table] ([file_name], [file_offset], [data])
    SELECT t2.file_name, t2.file_offset, t2.event_data
    FROM [session_data_reader_config]
    CROSS APPLY sys.fn_xe_file_target_read_file(file_target_path,NULL, NULL, NULL) t2
END


-- update the config table with the last file and offset
UPDATE [session_data_reader_config]
    SET [last_file_read]        = T.[file_name]
      , [last_file_read_offset] = T.[file_offset]
    FROM (
            SELECT TOP (1) 
                   [file_name]
                 , [file_offset]
              FROM [staging_table]
          ORDER BY [id] DESC 
    ) AS T ([file_name], [file_offset])
Другие вопросы по тегам