Коннектор SnowFlake-Kafka -> Посадочный стол -> Целевой стол. Как очистить посадочный стол

Я изучаю данные из Kafka -> SnowFlake/Kafka connector -> SnowFlake. К сожалению, коннектор, похоже, использует только два столбца (и помещает всю полезную нагрузку JSON в один столбец). Поэтому я создал поток / задачу для периодического копирования данных из целевой таблицы в целевую таблицу (с помощью вставки). Все работает прекрасно, за исключением удаления данных из целевой таблицы после того, как они попали в целевую таблицу. Используя потоки, я знаю, что приземлилось. Как мне удалить остальные данные? Truncate кажется намного быстрее. Я просто периодически запускаю задачу удаления, которая удаляет эти записи? Меня также беспокоит складское время для выполнения этих удалений. Благодарность

1 ответ

Для случая использования, когда несколько операторов (например, вставка, удаление и т.д.) для доступа к одним и тем же записям изменений, заключите их в явный оператор транзакции (Begin..Commit), который заблокирует поток.

У вас может быть дополнительный столбец, такой как Flag, заблокировать поток с помощью Begin, использовать поток для вставки в целевую таблицу из промежуточной таблицы, использовать поток для выполнения второго слияния с промежуточной таблицей, чтобы отметить столбец Flag.

https://docs.snowflake.com/en/user-guide/streams.html

begin;

select * from <stream>;

insert into target_table select columns from <stream> where metadata$action='INSERT' and flag=0;

merge into staging_table st
using (
select column
  from stream
  where flag = 0) sc
  on st.column=sc.column
  when matched then update set st.flag=1;
commit;
delete from staging_table where flag=1;

Другие вопросы по тегам