Кинезис получает данные от нескольких осколков

Я пытаюсь создать простое приложение, которое считывает данные из AWS Kinesis. Мне удалось прочитать данные с помощью одного шарда, но я хочу получить данные от 4 разных шардов.

Проблема в том, что у меня есть цикл while, который повторяется до тех пор, пока шард активен, что не позволяет мне читать данные из разных шардов. До сих пор я не мог найти альтернативный алгоритм и не смог реализовать решение на основе KCL. Спасибо заранее

public static void DoSomething() {
        AmazonKinesisClient client = new AmazonKinesisClient();
        //noinspection deprecation
        client.setEndpoint(endpoint, serviceName, regionId);  
        /** get shards from the stream using describe stream method*/

        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(streamName);
        List<Shard> shards = new ArrayList<>();
        String exclusiveStartShardId = null;
        do {
            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
            DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
            shards.addAll(describeStreamResult.getStreamDescription().getShards());
            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
            } else {
                exclusiveStartShardId = null;
            }
        }while (exclusiveStartShardId != null);

        /** shards obtained */
        String shardIterator;

        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(streamName);
        getShardIteratorRequest.setShardId(shards.get(0).getShardId());
        getShardIteratorRequest.setShardIteratorType("LATEST"); 

        GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
        shardIterator = getShardIteratorResult.getShardIterator();
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();

        while (!shardIterator.equals(null)) {
            getRecordsRequest.setShardIterator(shardIterator);
            getRecordsRequest.setLimit(250);
            GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
            List<Record> records = getRecordsResult.getRecords();

            shardIterator = getRecordsResult.getNextShardIterator();
            if(records.size()!=0) {
                for(Record r : records) {
                    System.out.println(r.getPartitionKey());
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }

2 ответа

Рекомендуется, чтобы вы не читали из одного процесса / работника из нескольких сегментов. Во-первых, как вы можете видеть, это усложняет ваш код, но, что более важно, у вас будут проблемы с расширением.

"Секрет" масштабируемости заключается в том, чтобы иметь небольших и независимых работников или другие подобные подразделения. Такой дизайн вы можете увидеть в Hadoop, DynamoDB или Kinesis в AWS. Это позволяет создавать небольшие системы (микро-сервисы), которые можно легко масштабировать по мере необходимости. Вы можете легко добавить больше единиц работы / данных по мере того, как ваша служба станет более успешной или из-за других колебаний в ее использовании.

Как вы можете видеть в этих сервисах AWS, иногда вы можете получить эту масштабируемость автоматически, например, в DynamoDB, а иногда вам нужно добавлять осколки в ваши потоки kinesis. Но для вашего приложения вам нужно как-то контролировать свою масштабируемость.

В случае Kinesis вы можете увеличивать и уменьшать масштаб с помощью AWS Lambda или Kinesis Client Library (KCL). Оба они слушают состояние ваших потоков (количество сегментов и событий) и используют его для добавления или удаления рабочих и доставки событий для их обработки. В обоих этих решениях вы должны создать работника, который работает против одного осколка.

Если вам нужно выровнять события из нескольких сегментов, вы можете сделать это с помощью некоторой службы состояний, такой как Redis или DynamoDB.

Для более простого и аккуратного решения, когда вам нужно беспокоиться только о предоставлении собственного кода обработки сообщений, я бы рекомендовал использовать библиотеку KCL.

Цитата из документации

KCL действует как посредник между вашей логикой обработки записей и Kinesis Data Streams. KCL выполняет следующие задачи:

  • Подключается к потоку данных
  • Перечисляет осколки в потоке данных
  • Использует аренду для координации ассоциаций осколков со своими работниками.
  • Создает процессор записи для каждого сегмента, которым он управляет.
  • Извлекает записи данных из потока данных
  • Помещает записи в соответствующий обработчик записей
  • Контрольно-пропускные пункты обработаны записи
  • Уравновешивает ассоциации осколков (аренда) при изменении количества экземпляров рабочих процессов или при повторном разделении потока данных (разделение или слияние осколков).
Другие вопросы по тегам