AWS Kinesis Stream использует node.js без ввода

Я использую этот пример.

В этом потребителе мне нужно передать входные данные, откуда я могу получить такие как initializeInput, processRecordsInput для соответствующих методов:

Вот код,

initialize: function(initializeInput, completeCallback) {
        console.log("consumer called initialize");
        shardId = initializeInput.shardId;
        console.log("shard: " + shardId);
        completeCallback();
},
processRecords: function(processRecordsInput, completeCallback) {
    console.log("consumer called processRecords");
    if (!processRecordsInput || !processRecordsInput.records) {
        console.log("no processRecords");
        completeCallback();
        return;
    }
    var records = processRecordsInput.records;
    var record, data, sequenceNumber, partitionKey;

    console.log("total records: " + records);
    for (var i = 0; i < records.length; ++i) {
        record = records[i];
        data = new Buffer(record.data, 'base64').toString();
        sequenceNumber = record.sequenceNumber;
        partitionKey = record.partitionKey;
        log.info(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey));
    }
    if (!sequenceNumber) {
        console.log("no sequenceNumber");
        completeCallback();
        return;
    }
    // If checkpointing, completeCallback should only be called once checkpoint is complete.
    processRecordsInput.checkpointer.checkpoint(sequenceNumber, function(err, sequenceNumber) {
        log.info(util.format('Checkpoint successful. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber));
        console.log("processRecordsInput.checkpointer.checkpoint");
        completeCallback();
    });
}

0 ответов

Другие вопросы по тегам