Amazon KCL Checkpoint и Trim Horizon
Как связаны контрольные точки и обрезка в библиотеке AWS KCL?
Страница документации Обработка запуска, выключения и регулирования говорит:
По умолчанию KCL начинает читать записи из кончика потока; это самая последняя добавленная запись. В этой конфигурации, если приложение, создающее данные, добавляет записи в поток до того, как будут запущены какие-либо обработчики принимающих записей, записи не будут считываться обработчиками записей после их запуска.
Чтобы изменить поведение процессоров записей так, чтобы они всегда считывали данные с начала потока, установите следующее значение в файле свойств для приложения Amazon Kinesis Streams:
initialPositionInStream = TRIM_HORIZON
Страница документации " Разработка клиентской библиотеки Amazon Kinesis на Java" гласит:
Для потоков требуется, чтобы процессор записей отслеживал записи, которые уже были обработаны в сегменте. KCL заботится об этом отслеживании для вас, передавая контрольную точку (IRecordProcessorCheckpointer) в processRecords. Обработчик записей вызывает метод контрольной точки на этом интерфейсе, чтобы сообщить KCL о том, как далеко он продвинулся в обработке записей в сегменте. В случае сбоя работника KCL использует эту информацию для возобновления обработки сегмента с последней известной обработанной записи.
Кажется, что на первой странице KCL возобновляется в конце потока, а вторая страница - в последней известной обработанной записи (которая была помечена как обработанная RecordProcessor
с использованием checkpointer
). В моем случае мне обязательно нужно перезапустить последнюю обработанную запись. Нужно ли устанавливать initialPositionInStream в TRIM_HORIZON?
3 ответа
С потоком кинезиса у вас есть две опции, вы можете прочитать самые новые записи или начать с самой старой (TRIM_HORIZON).
Но, как только вы запустили ваше приложение, оно просто читает с позиции, в которой оно остановилось, используя свои контрольные точки. Вы можете увидеть эти контрольные точки в DynamodB (обычно имя таблицы в качестве имени приложения). Поэтому, если вы перезапустите свое приложение, оно обычно будет продолжаться с того места, где оно остановилось.
Ответ - нет, вам не нужно устанавливать initialPositionInStream в TRIM_HORIZON.
Когда вы читаете события из потока Kinesis, у вас есть 4 варианта:
TRIM_HORIZON - самые старые события, которые все еще находятся в потоковых осколках, прежде чем они будут автоматически обрезаны (по умолчанию 1 день, но могут быть продлены до 7 дней). Вы будете использовать эту опцию, если хотите запустить новое приложение, которое будет обрабатывать все записи, доступные в потоке, но потребуется некоторое время, пока оно сможет догнать и начать обработку событий в режиме реального времени.
ПОСЛЕДНИЕ - новейшие события в потоке, и игнорировать все прошедшие события. Вы будете использовать эту опцию, если запустите новое приложение, которое хотите немедленно обработать.
AT/AFTER_SEQUENCE_NUMBER - порядковый номер обычно является контрольной точкой, которую вы сохраняете во время обработки событий. Эти контрольные точки позволяют вам надежно обрабатывать события, даже в случае сбоя считывателя или когда вы хотите обновить его версию и продолжить обработку всех событий и не потерять ни одно из них. Разница между AT/AFTER основана на времени вашей контрольной точки, до или после успешной обработки событий.
Обратите внимание, что это единственная опция, относящаяся к осколку, поскольку все остальные опции являются глобальными для потока. Когда вы используете KCL, он управляет таблицей DynamoDB для этого приложения с записью для каждого сегмента с "текущим" порядковым номером для этого фрагмента.
AT_TIMESTAMP - расчетное время события, помещенного в поток. Вы будете использовать эту опцию, если хотите найти конкретные события для обработки на основе их временной метки. Например, если вы знаете, что в конкретный момент времени в вашей службе было событие из реальной жизни, вы можете разработать приложение, которое будет обрабатывать эти конкретные события, даже если у вас нет порядкового номера.
Подробности смотрите в документации по Kinesis здесь: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
Вы должны использовать "TRIM_HORIZON". Это повлияет только на то, когда ваше приложение начнет читать записи из потока. После этого он продолжит с последней известной позиции.