Как правильно обрезать промежуточную таблицу в конвейере ETL?

У нас есть конвейер ETL, который запускается для каждого CSV, загруженного в учетную запись хранения (Azure). Он выполняет некоторые преобразования в CSV и записывает выходные данные в другое место, также как CSV, и вызывает хранимую процедуру в базе данных (SQL Azure), которая принимает (BULK INSERT) полученный CSV в промежуточную таблицу.

Этот конвейер может иметь одновременное выполнение, поскольку несколько ресурсов могут загружать файлы в хранилище. Следовательно, промежуточная таблица довольно часто вставляет данные.

Затем у нас есть запланированное задание SQL (эластичное задание), которое запускает SP, который перемещает данные из промежуточной таблицы в конечную таблицу. На этом этапе мы хотели бы усечь / очистить промежуточную таблицу, чтобы не вставлять их повторно при следующем выполнении задания.

Проблема в том, что мы не можем быть уверены, что между загрузкой из промежуточной таблицы в конечную таблицу и командой усечения не было никаких новых данных, записанных в промежуточную таблицу, которые можно было бы усечь без предварительной вставки в конечную таблицу.

Есть ли способ заблокировать промежуточную таблицу, пока мы копируем данные в конечную таблицу, чтобы SP (вызываемый из конвейера ETL), пытающийся записать в нее, просто ждал, пока блокировка не будет снята? Возможно ли это с помощью транзакций или, возможно, некоторых команд ручной блокировки?

Если нет, то как лучше всего с этим справиться?

5 ответов

Решение

Я бы предложил решение с двумя одинаковыми промежуточными таблицами. Назовем их StageLoading и StageProcessing.
Процесс загрузки будет состоять из следующих шагов:
1. Вначале обе таблицы пусты.
2. Мы загружаем некоторые данные в таблицу StageLoading (я предполагаю, что каждая загрузка - это транзакция).
3. Когда запускается задание Elastic, оно выполняет:
- ALTER TABLE SWITCH, чтобы переместить все данные из StageLoading в StageProcessing. Это сделает StageLoading пустым и готовым к следующей загрузке. Это операция с метаданными, поэтому она занимает миллисекунды и полностью блокируется, поэтому будет выполняться между загрузками.
- загрузить данные из StageProcessing в финальные таблицы.
- усечь таблицу StageProcessing.
4. Теперь мы готовы к следующей эластичной работе.

Если мы попытаемся выполнить SWITCH, когда StageProcessing не пуст, ALTER завершится ошибкой, и это будет означать, что последний процесс загрузки завершился неудачно.

Я люблю sp_getapplock и сам использую этот метод в нескольких местах из-за его гибкости и полного контроля над логикой блокировки и временем ожидания.

Единственная проблема, которую я вижу, заключается в том, что в вашем случае не все параллельные процессы равны.

У вас есть пакет обновления 1 (SP1), который перемещает данные из промежуточной таблицы в основную таблицу. Ваша система никогда не пытается запустить несколько экземпляров этого SP.

Другой SP2, который вставляет данные в промежуточную таблицу, может запускаться несколько раз одновременно, и это нормально.

Легко реализовать блокировку, которая предотвратит одновременный запуск любой комбинации SP1 или SP2. Другими словами, это просто, если логика блокировки одинакова для SP1 и SP2, и они рассматриваются одинаково. Но тогда у вас не может быть одновременно запущено несколько экземпляров SP2.

Не очевидно, как реализовать блокировку, которая предотвратила бы одновременный запуск SP1 и SP2, позволяя одновременно запускать несколько экземпляров SP2.


Есть другой подход, который не пытается предотвратить одновременный запуск SP, но предполагает и ожидает, что одновременные запуски возможны.

Один из способов сделать это - добавить IDENTITYстолбец в промежуточную таблицу. Или автоматически заполняемое datetime, если вы можете гарантировать, что оно уникально и никогда не уменьшается, что может быть непросто. Или rowversion столбец.

Логика внутри SP2, которая вставляет данные в промежуточную таблицу, не меняется.

Логика внутри SP1, которая перемещает данные из промежуточной таблицы в основную таблицу, должна использовать эти значения идентификаторов.

Сначала прочтите текущее максимальное значение идентификатора из промежуточной таблицы и запомните его в переменной, скажем, @MaxID. Все последующие операции SELECT, UPDATE и DELETE из промежуточной таблицы в этом SP1 должны включать фильтрWHERE ID <= @MaxID.

Это гарантирует, что если во время работы SP1 в промежуточную таблицу будет добавлена ​​новая строка, эта строка не будет обработана и останется в промежуточной таблице до следующего запуска SP1.

Недостатком такого подхода является то, что вы не можете использовать TRUNCATE, вам нужно использовать DELETE с участием WHERE ID <= @MaxID.


Если вас устраивает несколько экземпляров SP2, ожидающих друг друга (и SP1), вы можете использовать sp_getapplockаналогично следующему. У меня есть этот код в моей хранимой процедуре. Вы должны поместить эту логику как в SP1, так и в SP2.

Я не звоню sp_releaseapplock здесь явно, потому что владелец блокировки установлен на транзакцию, и механизм автоматически снимет блокировку после завершения транзакции.

Вам не нужно помещать логику повтора в хранимую процедуру, она может быть во внешнем коде, который запускает эти хранимые процедуры. В любом случае ваш код должен быть готов к повторной попытке.

CREATE PROCEDURE SP2  -- or SP1
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    BEGIN TRANSACTION;
    BEGIN TRY
        -- Maximum number of retries
        DECLARE @VarCount int = 10;

        WHILE (@VarCount > 0)
        BEGIN
            SET @VarCount = @VarCount - 1;

            DECLARE @VarLockResult int;
            EXEC @VarLockResult = sp_getapplock
                @Resource = 'StagingTable_app_lock',
                -- this resource name should be the same in SP1 and SP2
                @LockMode = 'Exclusive',
                @LockOwner = 'Transaction',
                @LockTimeout = 60000,
                -- I'd set this timeout to be about twice the time
                -- you expect SP to run normally
                @DbPrincipal = 'public';

            IF @VarLockResult >= 0
            BEGIN
                -- Acquired the lock

                -- for SP2
                -- INSERT INTO StagingTable ...

                -- for SP1
                -- SELECT FROM StagingTable ...
                -- TRUNCATE StagingTable ...

                -- don't retry any more
                BREAK;
            END ELSE BEGIN
                -- wait for 5 seconds and retry
                WAITFOR DELAY '00:00:05';
            END;
        END;

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION;
        -- log error
    END CATCH;

END

Этот код гарантирует, что в любой момент времени с промежуточной таблицей работает только одна процедура. Параллелизма нет. Все остальные экземпляры будут ждать.

Очевидно, что если вы попытаетесь получить доступ к промежуточной таблице не через эти SP1 или SP2 (которые сначала попытаются получить блокировку), то такой доступ не будет заблокирован.

Есть ли способ заблокировать промежуточную таблицу, пока мы копируем данные в конечную таблицу, чтобы SP (вызываемый из конвейера ETL), пытающийся записать в нее, просто ждал, пока блокировка не будет снята? Возможно ли это с помощью транзакций или, возможно, некоторых команд ручной блокировки?

Похоже, вы ищете механизм, который шире уровня транзакции. У SQL Server/Azure SQL DB есть один, и он называется блокировкой приложения:

sp_getapplock

Устанавливает блокировку на ресурс приложения.

Блокировки, наложенные на ресурс, связаны либо с текущей транзакцией, либо с текущим сеансом. Блокировки, связанные с текущей транзакцией, снимаются, когда транзакция фиксируется или откатывается.Блокировки, связанные с сеансом, снимаются при выходе из сеанса. Когда сервер выключается по какой-либо причине, все блокировки снимаются.

Блокировки можно явно снять с помощью процедуры sp_releaseapplock. Когда приложение вызывает sp_getapplock несколько раз для одного и того же ресурса блокировки, sp_releaseapplock необходимо вызывать одинаковое количество раз, чтобы снять блокировку. Когда блокировка открывается владельцем блокировки транзакции, эта блокировка снимается, когда транзакция фиксируется или откатывается.

Это в основном означает, что ваш инструмент ETL должен открывать один сеанс для БД, получать блокировку и освобождать по завершении. Другие сеансы, прежде чем пытаться что-либо сделать, должны попытаться получить блокировку (они не могут, потому что она уже занята), дождаться, когда она будет снята, и продолжить работу.

Предполагая, что у вас есть одна исходящая работа

  • Добавьте в таблицу БИТ ПО УМОЛЧАНИЮ 0 для исходящей обработки.
  • В задании SET OutboundProcessing = 1 WHERE OutboundProcessing = 0 (требовать строки)
  • Для ETL включите WHERE OutboundProcessing = 1 в запрос, который является источником данных (передача строк)
  • После ETL УДАЛИТЬ ИЗ ТАБЛИЦЫ, ГДЕ OutboundProcessing = 1 (удалить строки, которые вы передали)
  • Если ETL не работает, SET OutboundProcessing = 0 WHERE OutboundProcessing = 1

Я всегда предпочитаю "идентифицировать" каждый полученный файл. Если вы можете это сделать, вы можете связать записи из данного файла в процессе загрузки. Вы не говорили о необходимости в этом, но просто говорите.

Однако, если у каждого файла есть идентификатор (должно быть только значение идентификатора int/bigint), вы можете динамически создавать столько таблиц загрузки, сколько захотите, из таблицы загрузки "шаблона".

  1. Когда поступит файл, создайте новую таблицу загрузки с именем с идентификатором файла.
  2. Обработайте данные от загрузки до финальной таблицы.
  3. отбросить таблицу загрузки для обрабатываемого файла.

Это несколько похоже на другое решение об использовании 2 таблиц (загрузка и этап), но даже в этом решении вы по-прежнему ограничены "загруженными" двумя файлами (хотя вы все еще применяете только один файл к финальной таблице?)

Наконец, неясно, отсоединена ли ваша "Эластичная работа" от фактического конвейера / обработки "загрузки" или она включена. Будучи работой, я предполагаю, что она не включена, если задание, вы можете запускать только один экземпляр за раз? Поэтому непонятно, почему так важно загружать сразу несколько файлов, если вы можете перемещать только один из загрузки в финал за раз. Зачем спешить с загрузкой файлов?

Другие вопросы по тегам