Не могу скопировать JSON - Динамо БД Потоки в красное смещение

Ниже приведен пример использования, над которым я работаю: я настроил enable Streams при создании DynamoDB с new and old Image.Я создал Kinesis Firehose delivery stream с назначением как Redshift(Intermediate s3).

Из Dynamodb мой поток достигает Firhose, а оттуда в Bucket в виде JSON (S3 Bucket -Gzip), приведенного ниже. Моя проблема в том, что я cannot COPY this JSON to redshift,

Вещи, которые я не могу получить:

    1. Не уверен, каким должен быть оператор "Создать таблицу" в Redshift
    1. Каким должен быть синтаксис COPY в Kinesis firhose.
    1. Как я должен использовать JsonPaths здесь. Пожарная часть Kinesis Data собирается вернуть только json в мое ведро s3.
    1. Как упомянуть Maniphest в команде COPY

Загрузка JSON в S3 показана ниже:

{
    "Keys": {
        "vehicle_id": {
            "S": "x011"
        }
    },
    "NewImage": {
        "heart_beat": {
            "N": "0"
        },
        "cdc_id": {
            "N": "456"
        },
        "latitude": {
            "N": "1.30951"
        },
        "not_deployed_counter": {
            "N": "1"
        },
        "reg_ind": {
            "N": "0"
        },
        "operator": {
            "S": "x"
        },
        "d_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "z_id": {
            "N": "1267"
        },
        "last_end_trip_dttm": {
            "S": "11/08/2018 1:43:46 PM"
        },
        "land_ind": {
            "N": "1"
        },
        "s_ind": {
            "N": "1"
        },
        "status_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "case_ind": {
            "N": "1"
        },
        "last_po_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "violated_duration": {
            "N": "20"
        },
        "vehicle_id": {
            "S": "x011"
        },
        "longitude": {
            "N": "103.7818"
        },
        "file_status": {
            "S": "Trip_Start"
        },
        "unhired_duration": {
            "N": "10"
        },
        "eo_lat": {
            "N": "1.2345"
        },
        "reply_eo_ind": {
            "N": "1"
        },
        "license_ind": {
            "N": "0"
        },
        "indiscriminately_parked_ind": {
            "N": "0"
        },
        "eo_lng": {
            "N": "102.8978"
        },
        "officer_id": {
            "S": "xxxx@gmail.com"
        },
        "case_status": {
            "N": "0"
        },
        "color_status_cd": {
            "N": "0"
        },
        "parking_id": {
            "N": "2345"
        },
        "ttr_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "deployed_ind": {
            "N": "1"
        },
        "status": {
            "S": "PI"
        }
    },
    "SequenceNumber": "1200000000000956615967",
    "SizeBytes": 570,
    "ApproximateCreationDateTime": 1535513040,
    "eventName": "INSERT"
}

Моя Создать таблицу Заявление:

create table vehicle_status(
    heart_beat integer,
    cdc_id integer,
    latitude integer,   
    not_deployed_counter integer,
    reg_ind integer,
    operator varchar(10),
    d_dttm varchar(30),
    z_id integer,
    last_end_trip_dttm varchar(30),
    land_ind integer,
    s_ind integer,
    status_change_dttm varchar(30), 
    case_ind integer,
    last_po_change_dttm varchar(30),    
    violated_duration integer,
    vehicle_id varchar(8),
    longitude integer,  
    file_status varchar(30),
    unhired_duration integer,
    eo_lat integer,                     
    reply_eo_ind integer,
    license_ind integer,    
    indiscriminately_parked_ind integer,
    eo_lng integer,
    officer_id varchar(50),
    case_status integer,
    color_status_cd integer,
    parking_id integer,
    ttr_dttm varchar(30),
    deployed_ind varchar(3),
  status varchar(8));

И My Copy Statement (вручную пытаюсь восстановить это из Redshift):

COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) 
FROM 's3://<my-bucket>/2018/08/29/05/vehicle_status_change-2-2018-08-29-05-24-42-092c330b-e14a-4133-bf4a-5982f2e1f49e.gz' CREDENTIALS 'aws_iam_role=arn:aws:iam::<accountnum>:role/<RedshiftRole>' GZIP json 'auto';

Когда я пытаюсь описанную выше процедуру - я получаю Вставить записи - но все столбцы и строки равны нулю.

Как я могу скопировать этот формат JSON в Redhsift. Застряли здесь последние 3 дня. Любая помощь по этому вопросу подойдет.

S3 Bucket:

Amazon S3/<My-bucket>/2018/08/29/05
Amazon S3/<My-bucket>/manifests/2018/08/29/05

1 ответ

Я не очень знаком с Amazon, но позвольте мне ответить на большинство ваших вопросов, чтобы вы могли двигаться дальше. Другие люди могут редактировать этот ответ или дополнительную информацию. Спасибо!

Не уверен, каким должен быть оператор "Создать таблицу" в Redshift

Ваше заявление о создании create table vehicle_status(...) не имеет проблем, хотя вы могли бы добавить distribution key, sort key а также encoding в зависимости от ваших требований, обратитесь сюда и сюда

Согласно документам AWS Kenesis, ваша таблица должна присутствовать в Redshiftследовательно, вы можете подключиться к Redshift с помощью psql командовать и запустить create statement вручную.

Каким должен быть синтаксис COPY в Kinesis firhose.

Copy синтаксис останется таким же, либо вы запускаете его через psql или же firhoseК счастью, сценарий копирования, который вы придумали, работает без ошибок, я попробовал его в моем случае с небольшой модификацией прямой AWS/SECRET ключ, а не работает нормально, здесь sql Я запустил, который работал нормально и скопировал 1 запись данных в таблицу vehicle_status,

На самом деле ваша структура пути JSON сложна, следовательно, json 'auto' не будет работать. Вот рабочая команда, я создал образец jsonpath файл для вас с 4 примерами полей, и вы можете следовать той же структуре, чтобы создатьjsonpathфайл со всеми точками данных.

 COPY vehicle_status (heart_beat, cdc_id, operator, status) FROM 's3://XXX/development/test_file.json' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://XXX/development/yourjsonpathfile';

И ваш json path file должен иметь содержимое, аналогичное приведенному ниже.

{
  "jsonpaths": [
    "$['NewImage']['heart_beat']['N']",
    "$['NewImage']['cdc_id']['N']",
    "$['NewImage']['operator']['S']",
    "$['NewImage']['status']['S']"
  ]
}

Я проверил это, и это работает.

Как я должен использовать JsonPaths здесь. Пожарная часть Kinesis Data собирается вернуть только json в мое ведро s3.

Я использовал твой пример json только данные, и это работает, поэтому я не вижу здесь проблемы.

Как упомянуть Maniphest в команде COPY

Это хороший вопрос, я мог бы попытаться объяснить это, я надеюсь, что вы имеете в виду menifest,

Если вы видите выше команду копирования, она отлично работает для одного файла или пары файлов, но подумайте, что у вас много файлов, вот вам и концепция menifest, Непосредственно из документов Amazon: "Вместо указания пути к объекту для команды COPY вы указываете имя текстового файла в формате JSON, в котором явно перечисляются файлы для загрузки".

Короче говоря, если вы хотите загрузить несколько файлов за один раз, что также является предпочтительным способом RedshiftВы могли бы создать простой menifest с JSON и поставьте то же самое в команде копирования.

{ "entries": [ {"url":"s3://mybucket-alpha/2013-10-04-custdata", "mandatory":true}, {"url":"s3://mybucket-alpha/2013-10-05-custdata", "mandatory":true},.... ] }

загрузить манифест в S3 и используйте то же самое в вашей команде копирования, как показано ниже.

 COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) FROM 's3://XXX/development/test.menifest' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://yourbucket/jsonpath' menifest;

Вот подробный справочник для Манифеста.

Я надеюсь, что это дает вам некоторые идеи, как двигаться дальше, и если вы видите конкретную ошибку, я был бы рад перефокусироваться на ответ.