Кинезис получает данные от нескольких осколков
Я пытаюсь создать простое приложение, которое считывает данные из AWS Kinesis. Мне удалось прочитать данные с помощью одного шарда, но я хочу получить данные от 4 разных шардов.
Проблема в том, что у меня есть цикл while, который повторяется до тех пор, пока шард активен, что не позволяет мне читать данные из разных шардов. До сих пор я не мог найти альтернативный алгоритм и не смог реализовать решение на основе KCL. Спасибо заранее
public static void DoSomething() {
AmazonKinesisClient client = new AmazonKinesisClient();
//noinspection deprecation
client.setEndpoint(endpoint, serviceName, regionId);
/** get shards from the stream using describe stream method*/
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
List<Shard> shards = new ArrayList<>();
String exclusiveStartShardId = null;
do {
describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
shards.addAll(describeStreamResult.getStreamDescription().getShards());
if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
} else {
exclusiveStartShardId = null;
}
}while (exclusiveStartShardId != null);
/** shards obtained */
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shards.get(0).getShardId());
getShardIteratorRequest.setShardIteratorType("LATEST");
GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
while (!shardIterator.equals(null)) {
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(250);
GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
shardIterator = getRecordsResult.getNextShardIterator();
if(records.size()!=0) {
for(Record r : records) {
System.out.println(r.getPartitionKey());
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
2 ответа
Рекомендуется, чтобы вы не читали из одного процесса / работника из нескольких сегментов. Во-первых, как вы можете видеть, это усложняет ваш код, но, что более важно, у вас будут проблемы с расширением.
"Секрет" масштабируемости заключается в том, чтобы иметь небольших и независимых работников или другие подобные подразделения. Такой дизайн вы можете увидеть в Hadoop, DynamoDB или Kinesis в AWS. Это позволяет создавать небольшие системы (микро-сервисы), которые можно легко масштабировать по мере необходимости. Вы можете легко добавить больше единиц работы / данных по мере того, как ваша служба станет более успешной или из-за других колебаний в ее использовании.
Как вы можете видеть в этих сервисах AWS, иногда вы можете получить эту масштабируемость автоматически, например, в DynamoDB, а иногда вам нужно добавлять осколки в ваши потоки kinesis. Но для вашего приложения вам нужно как-то контролировать свою масштабируемость.
В случае Kinesis вы можете увеличивать и уменьшать масштаб с помощью AWS Lambda или Kinesis Client Library (KCL). Оба они слушают состояние ваших потоков (количество сегментов и событий) и используют его для добавления или удаления рабочих и доставки событий для их обработки. В обоих этих решениях вы должны создать работника, который работает против одного осколка.
Если вам нужно выровнять события из нескольких сегментов, вы можете сделать это с помощью некоторой службы состояний, такой как Redis или DynamoDB.
Для более простого и аккуратного решения, когда вам нужно беспокоиться только о предоставлении собственного кода обработки сообщений, я бы рекомендовал использовать библиотеку KCL.
Цитата из документации
KCL действует как посредник между вашей логикой обработки записей и Kinesis Data Streams. KCL выполняет следующие задачи:
- Подключается к потоку данных
- Перечисляет осколки в потоке данных
- Использует аренду для координации ассоциаций осколков со своими работниками.
- Создает процессор записи для каждого сегмента, которым он управляет.
- Извлекает записи данных из потока данных
- Помещает записи в соответствующий обработчик записей
- Контрольно-пропускные пункты обработаны записи
- Уравновешивает ассоциации осколков (аренда) при изменении количества экземпляров рабочих процессов или при повторном разделении потока данных (разделение или слияние осколков).