Как устранить неполадки с автоматическим захватом Snowpipe?

Я пытаюсь загрузить все новые файлы из ведра AWS S3 в зависимости от его путей к двум таблицам Snowflake, но мне пока не удалось добиться успеха даже с одной таблицей. Что я пробовал:

Создал этап:

CREATE or replace STAGE DATA_SCIENCE.INFRA.jobs_resource_usage URL = 's3://om/jobs-resource-usage/'
  storage_integration = om_s3 FILE_FORMAT=(TYPE='JSON');

Создал таблицу:

create or replace TABLE DATA_SCIENCE.INFRA.job_metrics (
  job_name STRING,
  build_number INT,
  cpu_perc INT,
  mem BIGINT,
  "timestamp" TIMESTAMP
  );

Создал трубу:

create or replace pipe DATA_SCIENCE.INFRA.job_metrics auto_ingest=true as
    copy into DATA_SCIENCE.INFRA.job_metrics
        from (select
            REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
            REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
            $1:cpu_perc::INT,
            $1:mem::BIGINT,
            $1:timestamp::TIMESTAMP
        from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/);

Добавлен SQS ARN в событие корзины с:

  • префикс: jobs_resource_usage/
  • суффикс: .json
  • отправить в: очередь SQS
  • Очередь SQS ARN: та, которая select parse_json(SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics')):notificationChannelName; вернулся

Этап работает, потому что я могу перечислить файлы, например:

ls '@DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/job_name=Ingest job';

Что возвращает имена файлов S3, например (включил образец, чтобы увидеть его формат):

s3://om/jobs-resource-usage/metrics/job_name=Ingest job/build_number=144.json

Я могу успешно загрузить файл вручную с помощью:

copy into DATA_SCIENCE.INFRA.job_metrics
    from (select
          REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
          REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
        $1:cpu_perc::INT,
        $1:mem::BIGINT,
        $1:timestamp::TIMESTAMP
        from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/)
        files=('job_name=Ingest job/build_number=144.json');

Однако труба ничего не загружает. Если я сделаю

select SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics');

Я вижу, что он получает уведомления:

{"executionState":"RUNNING","pendingFileCount":0,"notificationChannelName":"arn:aws:sqs:us-west-2:494544507972:sf-snowpipe-concealed","numOutstandingMessagesOnChannel":7,"lastReceivedMessageTimestamp":"2020-08-13T09:59:21.107Z"}

но я не вижу ничего lastForwardedMessageTimestampзаписей, что предполагает наличие проблемы с сопоставлением пути? Я пробовал несколько перестановок с ведущей косой чертой и загружал файлы прямо вmetrics путь, без пробелов или =с, безуспешно.

Что я сделал не так, как я мог понять, в чем проблема?

2 ответа

Решение

Просмотрите, какие этапы указывают на корзины S3. Наличие нескольких этапов на разных уровнях детализации может вызвать конфликты чтения очередей сообщений. Если канал работает правильно и видит сообщения, вы увидите lastForwardedMessageTimestamp, как вы упомянули. Если вы этого не видите, значит, в вашей очереди нет сообщений, или канал неправильно читает очередь, или существует конфликт, и что-то еще сначала читает сообщения очереди. Есть ли у вас доступ для проверки журналов очереди SQS, чтобы убедиться, что сообщения отображаются в первую очередь и что ваша очередь работает? Если ваша очередь работает правильно, я бы дважды проверил, есть ли у вас права доступа к очереди, и что у вас нет нескольких этапов, конфликтующих в вашей интеграции и очереди.

Похоже, вам может не хватать file_format = (type = 'JSON') в конце оператора создания канала

Кроме того, согласно документации, вам может потребоваться установить aws_sns_topic= '' в определении канала также

Другие вопросы по тегам