Как удалить данные, которые уже были использованы потребителем? Кафка

Я делаю репликацию данных в Кафке. Но размер файла журнала kafka увеличивается очень быстро. Размер достигает 5 Гб в день. Как решение этой проблемы, я хочу немедленно удалить обработанные данные. Я использую метод удаления записи в AdminClient для удаления смещения. Но когда я смотрю на файл журнала, данные, соответствующие этому смещению, не удаляются.

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

Я не хочу такие предложения, как (log.retention.hours, log.retention.bytes, log.segment.bytes, log.cleanup.policy= удалить)

Потому что я просто хочу удалить данные, потребляемые потребителем. В этом решении я также удалил данные, которые не потребляются.

Каковы ваши предложения?

3 ответа

Вы не сделали ничего плохого. Предоставленный вами код работает, и я его протестировал. На случай, если я что-то пропустил в вашем коде, мой:

    public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

Я использовал группу: 'org.apache.kafka', имя: 'kafka-clients', версия: '2.0.0'

Так что проверьте, если вы нацеливаетесь на правильный раздел ( 0 для первого)

Проверьте версию своего брокера: https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html говорит:

Эта операция поддерживается брокерами с версией 0.11.0.0

Создайте сообщения из того же приложения, чтобы убедиться, что вы подключены правильно.

Есть еще один вариант, который вы можете рассмотреть. Использование cleanup.policy=compact Если ваши ключи сообщений повторяются, вы можете извлечь из этого пользу. Не только потому, что старые сообщения для этого ключа будут автоматически удаляться, но вы можете использовать тот факт, что сообщение с нулевой полезной нагрузкой удаляет все сообщения для этого ключа. Только не забудьте установить значения delete.retention.ms и min.compaction.lag.ms достаточно маленькими. В этом случае вы можете использовать сообщение, а затем создать нулевую полезную нагрузку для того же ключа (но будьте осторожны с этим подходом, так как таким образом вы можете удалять сообщения (с этим ключом), которые вы не использовали)

Попробуй это

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

В этом коде нужно позвонить entry.getValue().get().lowWatermark(), поскольку adminClient.deleteRecords(recordsToDelete) возвращает карту Futures, вам нужно дождаться запуска Future, вызвав get()

Этот код будет работать только в том случае, если политика очистки «удалить» или «сжать, удалить», иначе код выдаст исключение «Нарушение политики».

Другие вопросы по тегам