Как использовать requestShutdown и shutdown для корректного завершения работы в случае библиотеки KCL Java для AWS Kinesis

Я пытаюсь использовать новую функцию библиотеки KCL в Java для AWS Kinesis, чтобы выполнить постепенное выключение, зарегистрировавшись с помощью ловушки shutdown, чтобы остановить все процессоры записи, а затем грациозно работника. Новая библиотека предоставляет новый интерфейс, для которого необходимо реализовать процессоры записи. Но как это вызывается?

Попытался вызвать сначала worker.requestShutdown(), затем worker.shutdown(), и все заработало. Но это какой-то намеренный способ его использования. Какая польза от того, чтобы использовать и то, и другое?

1 ответ

Решение

Начиная потребителя

Как вы знаете, когда вы создаете Worker, Это

1) создает таблицу смещения потребителя в DynamodB

2) создавать арендные договоры, составлять график аренды и возобновлять аренду через заданный интервал времени

Если у вас есть два раздела, то в одной и той же таблице DynamodB будет две записи, что означает, что раздел требует аренды.

например.

{
  "checkpoint": "TRIM_HORIZON",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 38,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}

{
  "checkpoint": "49570828493343584144205257440727957974505808096533676050",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 40,
  "leaseKey": "shardId-000000000001",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}
  • график получения и продления аренды осуществляется координатором аренды ScheduledExecutorService (называется leaseCoordinatorThreadPool)

3) Затем для каждого раздела в потоке, Worker создает внутренний PartitionConsumer, который фактически извлекает события и отправляет их RecordProcessor#processRecords, см. ProcessTask # call

4) по вашему вопросу вы должны зарегистрировать свой IRecordProcessorFactory подразумевать worker, который даст один ProcessorFactoryImpl для каждого PartitionConsumer,

например. посмотрите пример здесь, который может быть полезным

KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
 "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
            .withKinesisClientConfig(getHttpConfiguration())
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream

Worker consumerWorker = new Worker.Builder()
            .recordProcessorFactory(new DavidsEventProcessorFactory())
            .config(streamConfig)
            .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
            .build();


public class DavidsEventProcessorFactory implements IRecordProcessorFactory {

    private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);

    @Override
    public IRecordProcessor createProcessor() {
        logger.info("Creating an EventProcessor.");
        return new DavidsEventPartitionProcessor();
    }
}

class DavidsEventPartitionProcessor implements IRecordProcessor {

    private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);

    //TODO add consumername ?

    private String partitionId;

    private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;

    public KinesisEventPartitionProcessor() {
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.partitionId = initializationInput.getShardId();
        logger.info("Initialised partition {} for streaming.", partitionId);
    }

    @Override
    public void processRecords(ProcessRecordsInput recordsInput) {
        recordsInput.getRecords().forEach(nativeEvent -> {
            String eventPayload = new String(nativeEvent.getData().array());
            logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);

            //update offset after configured amount of retries
            try {
                recordsInput.getCheckpointer().checkpoint();
                logger.debug("Persisted the consumer offset to {} for partition {}",
                        nativeEvent.getSequenceNumber(), partitionId);
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        });
    }

    @Override
    public void shutdown(ShutdownInput shutdownReason) {
        logger.debug("Shutting down event processor for {}", partitionId);

        if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
            try {
                shutdownReason.getCheckpointer().checkpoint();
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        }
    }

}

// затем начинаем потребителя

consumerWorker.run();

Остановка потребителя

Теперь, когда вы хотите остановить свой экземпляр Consumer (Worker), вам не нужно много общаться с каждым PartitionConsumer, о котором позаботятся Worker как только вы попросите его закрыть.

  • с shutdownспрашивает leaseCoordinatorThreadPool чтобы остановить, который был ответственным за возобновление и получение аренды, и ожидает прекращения.

  • requestShutdown с другой стороны отменяет арендодателя, и уведомляет PartitionConsumerо закрытии.

И более важная вещь с requestShutdown если вы хотите получать уведомления о вашем RecordProcessor тогда вы можете реализовать IShutdownNotificationAware также. Таким образом, в случае состояния гонки, когда ваш RecordProcessor обрабатывает событие, но работник собирается завершить работу, вы все равно сможете зафиксировать свое смещение и затем завершить работу.

requestShutdown возвращает ShutdownFuture, который затем перезванивает worker.shutdown

Вы должны будете реализовать следующий метод на вашем RecordProcessor получать уведомления о requestShutdown,

class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {

   private String partitionId;

   // few implementations

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        logger.debug("Shutdown requested for {}", partitionId);
    }

}

Но если вы потеряете договор об аренде до того, как уведомите об этом, он может не позвонить

Резюме к вашим вопросам

Новая библиотека предоставляет новый интерфейс, для которого необходимо реализовать процессоры записи. Но как это вызывается?

  • реализовать IRecordProcessorFactory а также IRecordProcessor,
  • затем подключите RecordProcessorFactory на ваш Worker,

Попытался вызвать сначала worker.requestShutdown(), затем worker.shutdown(), и все заработало. Но есть ли намеренный способ его использования?

Вы должны использовать requestShutdown() для постепенного отключения, которое позаботится о состоянии гонки. Это было введено в kinesis-client-1.7.1

Другие вопросы по тегам