Snowflake с использованием потоков для отслеживания обновлений / удалений в таблице

Мне сложно понять, как работают Streams с точки зрения отслеживания изменений. Я хотел бы создать таблицу истории, в которой отслеживаются всеUPDATE а также DELETE к столу, но я обнаружил, что не понимаю, как это работает.

Если у меня есть стол Table1 с потоком:

 CREATE TABLE Table1
 (
   XID INT IDENTITY PRIMARY KEY,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
 );

 CREATE STREAM Table1_History ON TABLE Table1;

Если я вставлю данные:

INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;

Затем запустите:

SELECT * FROM Table1_History;

Он возвращает следующее:

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   101 String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

Все идет нормально.

Но если я побегу:

UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

Затем выберите из Table1_History, Я получил:

SELECT * FROM Table1_History;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

В METADATA$ACTION все еще INSERT, а FIELD1 значение теперь хранится в потоке как 1001. Больше нет записи, я вижу, что строка имела значение101 и что он был обновлен.

Если я запустил следующее:

DELETE FROM Table1 WHERE XID = 2;

Теперь поток возвращается:

SELECT * FROM Table1_History;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b

Сейчас я могу видеть 0 записей в потоке второй строки, когда-либо находящейся в базе данных.

Я не понимаю суть таблицы Stream для отслеживания UPDATES/DELETES. Разве это не использование потоков?

Я пробовал следовать этому: Snowflake Streams Made Simple, но до сих пор не понимаю.

3 ответа

Решение

Процитируем документацию Snowflake: "Поток хранит текущую транзакционную версию таблицы и является подходящим источником записей CDC в большинстве сценариев".

Взгляните на этот пример в документации Snowflake: https://docs.snowflake.com/en/user-guide/streams.html

Насколько я понимаю, поток будет содержать только текущую версию записи, пока вы не увеличите смещение. Поэтому, если вы вставляете запись, а затем обновляете ее, прежде чем продвигать смещение, тогда она покажет одну вставку, но поля будут содержать самые последние значения.

Если вы затем продвинете смещение и обновите или удалите запись, эти события будут отображаться в потоке - хотя, если вы обновили, а затем удалили ту же запись (перед продвижением смещения), поток просто отобразит удаление, так как это последняя позиция для этой записи.

ОБНОВЛЕНИЕ 1 Похоже, вы пытаетесь реализовать отслеживание аудита для каждого изменения, внесенного в запись в таблице - это не то, для чего предназначены потоки, и я не думаю, что вы сможете реализовать решение, используя потоки, это гарантировало регистрацию каждого изменения.

Если вы читаете документацию Streams, в ней говорится: "Поток может предоставить набор изменений от текущего смещения до текущего времени транзакции исходной таблицы (то есть текущей версии таблицы). Поток поддерживает только дельту изменений; если несколько операторов DML изменяют строку, поток содержит только последнее действие, выполненное с этой строкой ".

CDC - это терминология, специально относящаяся к загрузке хранилищ данных, и никогда не подразумевается как общий термин для фиксации каждого изменения, внесенного в запись.

Если вы хотите создать настоящую возможность аудита в Snowflake, я боюсь, что не знаю, возможно ли это. Функция путешествия во времени показывает, что Snowflake сохраняет все изменения, внесенные в запись (в течение периода хранения), но я не знаю, как получить доступ только к этим изменениям; Я думаю, вы можете получить доступ к истории записи только в определенные моменты времени, и у вас нет возможности узнать, в какое время были внесены какие-либо изменения

ОБНОВЛЕНИЕ 2 Только что понял, что Snowflake позволяет отслеживать изменения в таблице без необходимости использования потоков. Вероятно, это лучшее решение, если вы хотите фиксировать все изменения в таблице, а не только последнюю версию. Функциональность документирована здесь:https://docs.snowflake.com/en/sql-reference/constructs/changes.html

Хорошо, поскольку @NickW заявил, что таблица потоков больше предназначена для отслеживания изменений между смещениями. Это означает, что я все еще могу делать то, что хочу, но для этого потребуется явноеINSERT в таблицу истории между DML операции.

Сначала создайте основную таблицу, поток и таблицу истории:

CREATE TABLE Table1
(
   XID INT IDENTITY PRIMARY KEY,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);

CREATE STREAM Table1_Stream ON TABLE Table1;

CREATE TABLE Table1_History
(
   UID INT IDENTITY PRIMARY KEY,
   XID INT,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP,
   METADATA$ACTION STRING,   --METADATA Column from Stream
   METADATA$ISUPDATE STRING, --METADATA Column from Stream
   DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);

потом INSERT записи:

INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;

Таблица потоков теперь гласит:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   101    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

Затем сделайте INSERT из таблицы Streams в таблицу History:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

Обратите внимание WHERE пункт, исключающий INSERT только записи, если они не являются частью UPDATE, который DELETEs тогда INSERTs запись.

Теперь, несмотря на то, что записи потока фактически не были вставлены в таблицу истории из-за WHERE предложение, если вы запросите Stream, вы получите NULL:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

Теперь, если вы сделаете UPDATE поток покажет это:

UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    93256f240f338581cc4781c2e79a28075e1b66d7
1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    93256f240f338581cc4781c2e79a28075e1b66d7

Теперь запустим вставку:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

SELECT * FROM Table1_History;

UID XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   DATEINSERTED
1   1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    2020-08-14 09:13:41.474
2   1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    2020-08-14 09:13:41.474

И поток снова равен NULL:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

Если вы запустите DELETE, изменение снова отражается в Stream:

DELETE FROM Table1 WHERE XID = 2

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
2   102 String2 2020-08-14 09:11:20.173 DELETE  FALSE   51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3

И это может быть INSERT в таблицу истории:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

SELECT * FROM Table1_History;

UID XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   DATEINSERTED
1   1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    2020-08-14 09:13:41.474
2   1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    2020-08-14 09:13:41.474
4   2   102 String2 2020-08-14 09:11:20.173 DELETE  FALSE   2020-08-14 09:17:37.694

И поток снова равен NULL:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

Единственное, чего я не понимаю, это почему UID на Table1_History увеличено до 4 вместо того 3, но это тривиальная проблема.

Так я буду отслеживать все исторические изменения в Snowflake.

Потоки - это не таблицы.

Поток измеряет только дельту с момента последнего использования (или первоначального определения) потока. Что вы сейчас видите, так это то, что вы просто продолжаете перезаписывать данные, прежде чем сделать еще один снимок через поток. Начальные значения вставки - это то, что изменилось с момента последнего захвата данных. Путем списания вставленных значений в другую таблицу создается новый моментальный снимок, из которого можно измерить изменения.

Предлагаю вам создать таблицу: создать таблицу Table_history (...)

сделайте ваши первоначальные вставки в table1 и посмотрите на поток:(выберите * from Table1_History);

Теперь вставьте в свою таблицу истории из СТРИМ, который вы определили:

вставить в table_history (...) выбрать * из Table1_History;

Снова проверьте поток: выберите * из Table1_History. Данные должны исчезнуть. Ничего такого.

Затем выполните обновление:UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

Еще раз проверьте поток: выберите * из Table1_History

Другие вопросы по тегам