AWS kinesis getRecords возвращает пустые записи []

Я играю с кинезисом, пробовал очень простой пример. Шаги, которые я выполнил: Выполнить продюсер, чтобы поместить некоторые записи, которые успешно работают.

Ошибка на стороне потребителя при выполнении getRecords. Я попытался изменить все эти методы для получения записей из api:'AT_SEQUENCE_NUMBER | AFTER_SEQUENCE_NUMBER | TRIM_HORIZON | ПОСЛЕДНИЙ'.

Ответ:

      Aws\Result Object

([данные: Aws\Result:private] => Массив ([Записи] => Массив ()

              [NextShardIterator] => AAAAAAAAAA.....
        [MillisBehindLatest] => 0
        [@metadata] => Array

Исходный код:

      $streamName = 'test';
$numberOfRecordsPerBatch = 10000;

require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis([
    'region' => '{REGION}',
    'version' => '2013-12-02',
    'credentials' => [
        'key' => '{API_KEY}',
        'secret' => '{API_SECRET}'
    ]
]);

// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');

$count = 0;
$startTime = microtime(true);

foreach ($shardIds as $shardId) {
    echo "ShardId: $shardId\n";

    // get initial shard iterator
    $res = $kinesisClient->getShardIterator([
        'ShardId' => $shardId,
        'ShardIteratorType' => 'LATEST', // 'AT_SEQUENCE_NUMBER| AFTER_SEQUENCE_NUMBER | TRIM_HORIZON|LATEST'
        // 'StartingSequenceNumber' => '<string>',
        'StreamName' => $streamName,
    ]);
    $shardIterator = $res->get('ShardIterator');

    do {
        echo "Get Records\n";
        $res = $kinesisClient->getRecords([
            'Limit' => $numberOfRecordsPerBatch,
            'ShardIterator' => $shardIterator
        ]);

        $shardIterator = $res->get('NextShardIterator');
        $localCount = 0;

        foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
            list($sequenceNumber, $item) = $data;
            echo "- [$sequenceNumber] $item\n";
            $count++;
            $localCount++;
        }
        echo "Processed $localCount records in this batch\n";
        sleep(1);
    } while ($localCount>0);
}

Я также изучил документацию AWS и убедился, что все данные, которые мы отправляем, верны, но мы не получили никаких записей в ответ.

Спасибо

1 ответ

Когда вы загружаете записи в поток данных kinesis, вы получаете порядковый номер для каждой записи. Порядковые номера для одного и того же ключа раздела обычно со временем увеличиваются. Чем больше период времени между запросами на запись, тем больше становятся порядковые номера.

когда вы выполняете GetShardIterator, вы в основном указываете на конкретный порядковый номер в этом сегменте. Нет гарантии, что полученные данные доступны по текущему указателю. Так что вполне возможно, что первый GetRecords может не вернуть никаких записей. Вам нужно запустить GetRecords в цикле. в настоящее время ваше условие while не будет выполнено, если сначала GetRecords не вернет никакого результата. Вместо этого у вас может быть условие, чтобы проверить, не имеет ли NextShardIterator значение null, при непрерывном чтении из шарда.

Если вы хотите получить записи при первом вызове GetRecords, тогда

  1. сохранить порядковый номер, возвращенный в качестве ответа на вызов PutRecord.
  2. Используйте тип итератора сегмента «AT_SEQUENCE_NUMBER» в GetShardIterator и укажите сохраненный порядковый номер в поле StartingSequenceNumber.
  3. запустить GetRecords с итератором сегментов, возвращенным на шаге 2
Другие вопросы по тегам