AWS kinesis getRecords возвращает пустые записи []
Я играю с кинезисом, пробовал очень простой пример. Шаги, которые я выполнил: Выполнить продюсер, чтобы поместить некоторые записи, которые успешно работают.
Ошибка на стороне потребителя при выполнении getRecords. Я попытался изменить все эти методы для получения записей из api:'AT_SEQUENCE_NUMBER | AFTER_SEQUENCE_NUMBER | TRIM_HORIZON | ПОСЛЕДНИЙ'.
Ответ:
Aws\Result Object
([данные: Aws\Result:private] => Массив ([Записи] => Массив ()
[NextShardIterator] => AAAAAAAAAA.....
[MillisBehindLatest] => 0
[@metadata] => Array
Исходный код:
$streamName = 'test';
$numberOfRecordsPerBatch = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis([
'region' => '{REGION}',
'version' => '2013-12-02',
'credentials' => [
'key' => '{API_KEY}',
'secret' => '{API_SECRET}'
]
]);
// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');
$count = 0;
$startTime = microtime(true);
foreach ($shardIds as $shardId) {
echo "ShardId: $shardId\n";
// get initial shard iterator
$res = $kinesisClient->getShardIterator([
'ShardId' => $shardId,
'ShardIteratorType' => 'LATEST', // 'AT_SEQUENCE_NUMBER| AFTER_SEQUENCE_NUMBER | TRIM_HORIZON|LATEST'
// 'StartingSequenceNumber' => '<string>',
'StreamName' => $streamName,
]);
$shardIterator = $res->get('ShardIterator');
do {
echo "Get Records\n";
$res = $kinesisClient->getRecords([
'Limit' => $numberOfRecordsPerBatch,
'ShardIterator' => $shardIterator
]);
$shardIterator = $res->get('NextShardIterator');
$localCount = 0;
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
list($sequenceNumber, $item) = $data;
echo "- [$sequenceNumber] $item\n";
$count++;
$localCount++;
}
echo "Processed $localCount records in this batch\n";
sleep(1);
} while ($localCount>0);
}
Я также изучил документацию AWS и убедился, что все данные, которые мы отправляем, верны, но мы не получили никаких записей в ответ.
Спасибо
1 ответ
Когда вы загружаете записи в поток данных kinesis, вы получаете порядковый номер для каждой записи. Порядковые номера для одного и того же ключа раздела обычно со временем увеличиваются. Чем больше период времени между запросами на запись, тем больше становятся порядковые номера.
когда вы выполняете GetShardIterator, вы в основном указываете на конкретный порядковый номер в этом сегменте. Нет гарантии, что полученные данные доступны по текущему указателю. Так что вполне возможно, что первый GetRecords может не вернуть никаких записей. Вам нужно запустить GetRecords в цикле. в настоящее время ваше условие while не будет выполнено, если сначала GetRecords не вернет никакого результата. Вместо этого у вас может быть условие, чтобы проверить, не имеет ли NextShardIterator значение null, при непрерывном чтении из шарда.
Если вы хотите получить записи при первом вызове GetRecords, тогда
- сохранить порядковый номер, возвращенный в качестве ответа на вызов PutRecord.
- Используйте тип итератора сегмента «AT_SEQUENCE_NUMBER» в GetShardIterator и укажите сохраненный порядковый номер в поле StartingSequenceNumber.
- запустить GetRecords с итератором сегментов, возвращенным на шаге 2