Загрузка данных CSV в DynamoDB с использованием конвейера данных
Привет я пытаюсь импортировать CSV-файл из S3 собирается DynamoDB.
Сценарий выглядит следующим образом:
{
"objects": [
{
"writeThroughputPercent": "#{myDDBWriteThroughputRatio}",
"name": "DDBDestinationTable",
"id": "DDBDestinationTable",
"type": "DynamoDBDataNode",
"region": "ap-southeast-2",
"tableName": "testing-ariza"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://kronos-development/REDSHIFT-PIPELINE/",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"directoryPath": "s3://<path>/testing",
"name": "S3InputDataNode",
"id": "S3InputDataNode",
"type": "S3DataNode"
},
{
"bootstrapAction": "s3://#{myDDBRegion}.elasticmapreduce/bootstrap-actions/configure-hadoop, --mapred-key-value,mapreduce.map.speculative=false",
"name": "EmrClusterForLoad",
"coreInstanceCount": "1",
"coreInstanceType": "m3.xlarge",
"amiVersion": "3.9.0",
"id": "EmrClusterForLoad",
"masterInstanceType": "m3.xlarge",
"region": "#{myDDBRegion}",
"type": "EmrCluster",
"terminateAfter": "60 Minutes"
},
{
"output": {
"ref": "DDBDestinationTable"
},
"input": {
"ref": "S3InputDataNode"
},
"maximumRetries": "2",
"name": "TableLoadActivity",
"step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,\ns3://elasticmapreduce/libs/hive/hive-script,\n--run-hive-script,\n--hive-versions,\nlatest,\n--base-path,\ns3://elasticmapreduce/libs/hive/,\n--args,\n-f,\ns3://<path>/test.q",
"runsOn": {
"ref": "EmrClusterForLoad"
},
"id": "TableLoadActivity",
"type": "EmrActivity",
"resizeClusterBeforeRunning": "true"
}
],
"parameters": [
{
"description": "Input S3 folder",
"id": "myInputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"description": "Target DynamoDB table name",
"id": "myDDBTableName",
"type": "String"
},
{
"default": "0.25",
"watermark": "Enter value between 0.1-1.0",
"description": "DynamoDB write throughput ratio",
"id": "myDDBWriteThroughputRatio",
"type": "Double"
},
{
"default": "us-east-1",
"watermark": "us-east-1",
"description": "Region of the DynamoDB table",
"id": "myDDBRegion",
"type": "String"
}
],
"values": {
"myDDBRegion": "ap-southeast-2",
"myDDBTableName": "testing-ariza",
"myDDBWriteThroughputRatio": "0.25",
"myInputS3Loc": "s3://kronos-development/REDSHIFT-PIPELINE/"
}
}
Цель состоит в том, чтобы загрузить данные CSV из s3, идущие в DynamodB. Этот скрипт вызовет скрипт улья, хранящийся как s3:///test.q
CREATE EXTERNAL TABLE s3_table (
array STRING,
boolean STRING,
null STRING,
number int,
object STRING,
string string )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION "s3://<path>"
TBLPROPERTIES ("serialization.null.format"="");
CREATE EXTERNAL TABLE ddb_table (
array STRING,
boolean STRING,
null STRING,
number int,
object STRING,
string string )
STORED BY
'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES (
"dynamodb.table.name" = "testing",
"dynamodb.column.mapping" =
"array:array,boolean:boolean,null:null,number:number,object:object,string:string"
);
INSERT OVERWRITE TABLE ddb_table SELECT * FROM s3_table;
Однако я получаю ошибку с шагами в EMR. Я создал этот поток, используя Построение, используя Шаблон 'Импортировать данные резервного копирования DynamodB из S3', и добавил шаг в EMR.
s3://elasticmapreduce/libs/script-runner/script-runner.jar,
s3://elasticmapreduce/libs/hive/hive-script,
--run-hive-script,
--hive-versions,
latest,
--base-path,
s3://elasticmapreduce/libs/hive/,
--args,
-f,
s3://<path>/test.q
Я действительно новичок в услугах AWS. Спасибо!