AWS Kinesis Stream использует node.js без ввода
Я использую этот пример.
В этом потребителе мне нужно передать входные данные, откуда я могу получить такие как initializeInput
, processRecordsInput
для соответствующих методов:
Вот код,
initialize: function(initializeInput, completeCallback) {
console.log("consumer called initialize");
shardId = initializeInput.shardId;
console.log("shard: " + shardId);
completeCallback();
},
processRecords: function(processRecordsInput, completeCallback) {
console.log("consumer called processRecords");
if (!processRecordsInput || !processRecordsInput.records) {
console.log("no processRecords");
completeCallback();
return;
}
var records = processRecordsInput.records;
var record, data, sequenceNumber, partitionKey;
console.log("total records: " + records);
for (var i = 0; i < records.length; ++i) {
record = records[i];
data = new Buffer(record.data, 'base64').toString();
sequenceNumber = record.sequenceNumber;
partitionKey = record.partitionKey;
log.info(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey));
}
if (!sequenceNumber) {
console.log("no sequenceNumber");
completeCallback();
return;
}
// If checkpointing, completeCallback should only be called once checkpoint is complete.
processRecordsInput.checkpointer.checkpoint(sequenceNumber, function(err, sequenceNumber) {
log.info(util.format('Checkpoint successful. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber));
console.log("processRecordsInput.checkpointer.checkpoint");
completeCallback();
});
}