Snowflake с использованием потоков для отслеживания обновлений / удалений в таблице
Мне сложно понять, как работают Streams с точки зрения отслеживания изменений. Я хотел бы создать таблицу истории, в которой отслеживаются всеUPDATE
а также DELETE
к столу, но я обнаружил, что не понимаю, как это работает.
Если у меня есть стол Table1
с потоком:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_History ON TABLE Table1;
Если я вставлю данные:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
Затем запустите:
SELECT * FROM Table1_History;
Он возвращает следующее:
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
Все идет нормально.
Но если я побегу:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
Затем выберите из Table1_History
, Я получил:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
В METADATA$ACTION
все еще INSERT
, а FIELD1
значение теперь хранится в потоке как 1001
. Больше нет записи, я вижу, что строка имела значение101
и что он был обновлен.
Если я запустил следующее:
DELETE FROM Table1 WHERE XID = 2;
Теперь поток возвращается:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
Сейчас я могу видеть 0 записей в потоке второй строки, когда-либо находящейся в базе данных.
Я не понимаю суть таблицы Stream для отслеживания UPDATES/DELETES
. Разве это не использование потоков?
Я пробовал следовать этому: Snowflake Streams Made Simple, но до сих пор не понимаю.
3 ответа
Процитируем документацию Snowflake: "Поток хранит текущую транзакционную версию таблицы и является подходящим источником записей CDC в большинстве сценариев".
Взгляните на этот пример в документации Snowflake: https://docs.snowflake.com/en/user-guide/streams.html
Насколько я понимаю, поток будет содержать только текущую версию записи, пока вы не увеличите смещение. Поэтому, если вы вставляете запись, а затем обновляете ее, прежде чем продвигать смещение, тогда она покажет одну вставку, но поля будут содержать самые последние значения.
Если вы затем продвинете смещение и обновите или удалите запись, эти события будут отображаться в потоке - хотя, если вы обновили, а затем удалили ту же запись (перед продвижением смещения), поток просто отобразит удаление, так как это последняя позиция для этой записи.
ОБНОВЛЕНИЕ 1 Похоже, вы пытаетесь реализовать отслеживание аудита для каждого изменения, внесенного в запись в таблице - это не то, для чего предназначены потоки, и я не думаю, что вы сможете реализовать решение, используя потоки, это гарантировало регистрацию каждого изменения.
Если вы читаете документацию Streams, в ней говорится: "Поток может предоставить набор изменений от текущего смещения до текущего времени транзакции исходной таблицы (то есть текущей версии таблицы). Поток поддерживает только дельту изменений; если несколько операторов DML изменяют строку, поток содержит только последнее действие, выполненное с этой строкой ".
CDC - это терминология, специально относящаяся к загрузке хранилищ данных, и никогда не подразумевается как общий термин для фиксации каждого изменения, внесенного в запись.
Если вы хотите создать настоящую возможность аудита в Snowflake, я боюсь, что не знаю, возможно ли это. Функция путешествия во времени показывает, что Snowflake сохраняет все изменения, внесенные в запись (в течение периода хранения), но я не знаю, как получить доступ только к этим изменениям; Я думаю, вы можете получить доступ к истории записи только в определенные моменты времени, и у вас нет возможности узнать, в какое время были внесены какие-либо изменения
ОБНОВЛЕНИЕ 2 Только что понял, что Snowflake позволяет отслеживать изменения в таблице без необходимости использования потоков. Вероятно, это лучшее решение, если вы хотите фиксировать все изменения в таблице, а не только последнюю версию. Функциональность документирована здесь:https://docs.snowflake.com/en/sql-reference/constructs/changes.html
Хорошо, поскольку @NickW заявил, что таблица потоков больше предназначена для отслеживания изменений между смещениями. Это означает, что я все еще могу делать то, что хочу, но для этого потребуется явноеINSERT
в таблицу истории между DML
операции.
Сначала создайте основную таблицу, поток и таблицу истории:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_Stream ON TABLE Table1;
CREATE TABLE Table1_History
(
UID INT IDENTITY PRIMARY KEY,
XID INT,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP,
METADATA$ACTION STRING, --METADATA Column from Stream
METADATA$ISUPDATE STRING, --METADATA Column from Stream
DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
потом INSERT
записи:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
Таблица потоков теперь гласит:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
Затем сделайте INSERT
из таблицы Streams в таблицу History:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
Обратите внимание WHERE
пункт, исключающий INSERT
только записи, если они не являются частью UPDATE
, который DELETEs
тогда INSERTs
запись.
Теперь, несмотря на то, что записи потока фактически не были вставлены в таблицу истории из-за WHERE
предложение, если вы запросите Stream, вы получите NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
Теперь, если вы сделаете UPDATE
поток покажет это:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
Теперь запустим вставку:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
И поток снова равен NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
Если вы запустите DELETE
, изменение снова отражается в Stream:
DELETE FROM Table1 WHERE XID = 2
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3
И это может быть INSERT
в таблицу истории:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
4 2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 2020-08-14 09:17:37.694
И поток снова равен NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
Единственное, чего я не понимаю, это почему UID
на Table1_History
увеличено до 4
вместо того 3
, но это тривиальная проблема.
Так я буду отслеживать все исторические изменения в Snowflake.
Потоки - это не таблицы.
Поток измеряет только дельту с момента последнего использования (или первоначального определения) потока. Что вы сейчас видите, так это то, что вы просто продолжаете перезаписывать данные, прежде чем сделать еще один снимок через поток. Начальные значения вставки - это то, что изменилось с момента последнего захвата данных. Путем списания вставленных значений в другую таблицу создается новый моментальный снимок, из которого можно измерить изменения.
Предлагаю вам создать таблицу: создать таблицу Table_history (...)
сделайте ваши первоначальные вставки в table1 и посмотрите на поток:(выберите * from Table1_History);
Теперь вставьте в свою таблицу истории из СТРИМ, который вы определили:
вставить в table_history (...) выбрать * из Table1_History;
Снова проверьте поток: выберите * из Table1_History. Данные должны исчезнуть. Ничего такого.
Затем выполните обновление:UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
Еще раз проверьте поток: выберите * из Table1_History