Как использовать requestShutdown и shutdown для корректного завершения работы в случае библиотеки KCL Java для AWS Kinesis
Я пытаюсь использовать новую функцию библиотеки KCL в Java для AWS Kinesis, чтобы выполнить постепенное выключение, зарегистрировавшись с помощью ловушки shutdown, чтобы остановить все процессоры записи, а затем грациозно работника. Новая библиотека предоставляет новый интерфейс, для которого необходимо реализовать процессоры записи. Но как это вызывается?
Попытался вызвать сначала worker.requestShutdown(), затем worker.shutdown(), и все заработало. Но это какой-то намеренный способ его использования. Какая польза от того, чтобы использовать и то, и другое?
1 ответ
Начиная потребителя
Как вы знаете, когда вы создаете Worker
, Это
1) создает таблицу смещения потребителя в DynamodB
2) создавать арендные договоры, составлять график аренды и возобновлять аренду через заданный интервал времени
Если у вас есть два раздела, то в одной и той же таблице DynamodB будет две записи, что означает, что раздел требует аренды.
например.
{
"checkpoint": "TRIM_HORIZON",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 38,
"leaseKey": "shardId-000000000000",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
{
"checkpoint": "49570828493343584144205257440727957974505808096533676050",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 40,
"leaseKey": "shardId-000000000001",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
- график получения и продления аренды осуществляется координатором аренды ScheduledExecutorService (называется
leaseCoordinatorThreadPool
)
3) Затем для каждого раздела в потоке, Worker
создает внутренний PartitionConsumer, который фактически извлекает события и отправляет их RecordProcessor#processRecords
, см. ProcessTask # call
4) по вашему вопросу вы должны зарегистрировать свой IRecordProcessorFactory
подразумевать worker
, который даст один ProcessorFactoryImpl
для каждого PartitionConsumer
,
например. посмотрите пример здесь, который может быть полезным
KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
"consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
.withKinesisClientConfig(getHttpConfiguration())
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream
Worker consumerWorker = new Worker.Builder()
.recordProcessorFactory(new DavidsEventProcessorFactory())
.config(streamConfig)
.dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
.build();
public class DavidsEventProcessorFactory implements IRecordProcessorFactory {
private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);
@Override
public IRecordProcessor createProcessor() {
logger.info("Creating an EventProcessor.");
return new DavidsEventPartitionProcessor();
}
}
class DavidsEventPartitionProcessor implements IRecordProcessor {
private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);
//TODO add consumername ?
private String partitionId;
private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;
public KinesisEventPartitionProcessor() {
}
@Override
public void initialize(InitializationInput initializationInput) {
this.partitionId = initializationInput.getShardId();
logger.info("Initialised partition {} for streaming.", partitionId);
}
@Override
public void processRecords(ProcessRecordsInput recordsInput) {
recordsInput.getRecords().forEach(nativeEvent -> {
String eventPayload = new String(nativeEvent.getData().array());
logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);
//update offset after configured amount of retries
try {
recordsInput.getCheckpointer().checkpoint();
logger.debug("Persisted the consumer offset to {} for partition {}",
nativeEvent.getSequenceNumber(), partitionId);
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
});
}
@Override
public void shutdown(ShutdownInput shutdownReason) {
logger.debug("Shutting down event processor for {}", partitionId);
if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
try {
shutdownReason.getCheckpointer().checkpoint();
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
}
}
}
// затем начинаем потребителя
consumerWorker.run();
Остановка потребителя
Теперь, когда вы хотите остановить свой экземпляр Consumer (Worker
), вам не нужно много общаться с каждым PartitionConsumer
, о котором позаботятся Worker
как только вы попросите его закрыть.
с
shutdown
спрашиваетleaseCoordinatorThreadPool
чтобы остановить, который был ответственным за возобновление и получение аренды, и ожидает прекращения.requestShutdown
с другой стороны отменяет арендодателя, и уведомляетPartitionConsumer
о закрытии.
И более важная вещь с requestShutdown
если вы хотите получать уведомления о вашем RecordProcessor
тогда вы можете реализовать IShutdownNotificationAware
также. Таким образом, в случае состояния гонки, когда ваш RecordProcessor
обрабатывает событие, но работник собирается завершить работу, вы все равно сможете зафиксировать свое смещение и затем завершить работу.
requestShutdown
возвращает ShutdownFuture
, который затем перезванивает worker.shutdown
Вы должны будете реализовать следующий метод на вашем RecordProcessor
получать уведомления о requestShutdown
,
class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {
private String partitionId;
// few implementations
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
logger.debug("Shutdown requested for {}", partitionId);
}
}
Но если вы потеряете договор об аренде до того, как уведомите об этом, он может не позвонить
Резюме к вашим вопросам
Новая библиотека предоставляет новый интерфейс, для которого необходимо реализовать процессоры записи. Но как это вызывается?
- реализовать
IRecordProcessorFactory
а такжеIRecordProcessor
, - затем подключите
RecordProcessorFactory
на вашWorker
,
Попытался вызвать сначала worker.requestShutdown(), затем worker.shutdown(), и все заработало. Но есть ли намеренный способ его использования?
Вы должны использовать requestShutdown()
для постепенного отключения, которое позаботится о состоянии гонки. Это было введено в kinesis-client-1.7.1