Потребитель KCL не обрабатывает записи
Я прошел через несколько вопросов и ТАК вопросов и не смог заставить работать пример приложения.
Вот как это воспроизвести;
git clone https://github.com/awslabs/amazon-kinesis-client-python.git
cd amazon-kinesis-client-python
# Start sample producer
sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster
# Do setup
python3 setup.py download_jars &&\
python3 setup.py install
# Run sample app
`amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties prod.properties`
Ниже приведены единственные вещи, которые я изменил в моих prod.properties
executableName = python3 /amazon-kinesis-client-python/samples/sample_kclpy_app.py
dynamoDBEndpoint = http://127.0.0.1:8000
applicationName = test
maxRecords = 10000
idleTimeBetweenReadsInMillis = 200
callProcessRecordsEvenForEmptyRecordList = true
в sample_kclpy_app.py
мой process_record
функция выглядит так;
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this recod.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
print('Data: ', data)
with open('/log.txt', 'a') as the_file:
the_file.write(data)
return
Мои логи
/amazon-kinesis-client-python # `amazon_kclpy_helper.py --print_command --java /
usr/bin/java --properties prod.properties`
Mar 07, 2018 11:36:26 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 13bdbb57-e701-4be1-b2ca-6b808fa95b73
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property regionName with value us-east-1
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property idleTimeBetweenReadsInMillis with value 200
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property maxRecords with value 10000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property callProcessRecordsEvenForEmptyRecordList with value true
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property dynamoDBEndpoint with value http://127.0.0.1:8000
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running test to process stream words with executable python3
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: bleh
Mar 07, 2018 11:36:27 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/3.6 python3
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Mar 07, 2018 11:36:28 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
Mar 07, 2018 11:36:37 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator initialize
INFO: Created new lease table for coordinator with initial read capacity of 10 and write capacity of 10.
Mar 07, 2018 11:36:41 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Mar 07, 2018 11:36:43 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Starting LeaseCoordinator
Mar 07, 2018 11:36:46 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker computeLeasesToTake
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
Mar 07, 2018 11:36:48 AM com.amazonaws.services.kinesis.leases.impl.LeaseTaker takeLeases
INFO: Worker 13bdbb57-e701-4be1-b2ca-6b808fa95b73 successfully took 1 leases: shardId-000000000000
Mar 07, 2018 11:36:56 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker run
INFO: Initialization complete. Starting worker loop.
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker infoForce
INFO: Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=bleh, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]
Mar 07, 2018 11:36:58 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask call
INFO: No need to block on parents [] of shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher initialize
INFO: Initializing shard shardId-000000000000 with TRIM_HORIZON
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage
INFO: Writing InitializeMessage to child process for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading STDERR for shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.MessageWriter call
INFO: Message size == 110 bytes for shard shardId-000000000000
Mar 07, 2018 11:36:59 AM com.amazonaws.services.kinesis.multilang.LineReaderTask call
INFO: Starting: Reading next message from STDIN for shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:37:29 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:38:39 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:39:42 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:40:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000000
Mar 07, 2018 11:41:45 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...