Удалить логи кафки для использованных сообщений, используя SCS
Новичок в использовании кафки и весеннего облачного потока. Нужна помощь.
Настроить
- У меня есть два подпружиненных приложения App-1, App-2.
- Я использую Spring Cloud Stream и Spring-Cloud-Stream-Binder-Kafka для асинхронной связи.
- Есть одна тема ТЕМА-1
Случай использования
- Предположим, приложение-1 отправило сообщение по теме TOPIC-1, которое приложение-2 слушало.
- Приложение-2 приняло сообщение и успешно обработало его.
- Теперь смещение этой темы увеличивается.
Вопрос
- Как реализовать механизм удаления только успешно использованных данных сообщения из журналов kafka по истечении заданного периода времени?
В Кафке ответственность за то, что было потреблено, является ответственностью потребителя. Итак, я думаю, в весеннем облачном потоке kafka должен быть какой-то механизм управления журналом сообщений kafka, о котором я не знаю.
ПРИМЕЧАНИЕ 1. Я знаю о времени хранения журнала kafka и свойствах диска. Но логи кафки будут удалены даже для неиспользованных сообщений.
ПРИМЕЧАНИЕ 2: я прошел через этот вопрос, но это не может помочь.
1 ответ
В Кафке нет такого механизма, о котором я знаю; и, конечно, не в Spring Cloud Stream или библиотеках, на которых он основан. Клиенты Kafka не имеют доступа к таким низкоуровневым конструкциям.
Кроме того, смещения потребителей полностью отделены от журналов тем; у современных брокеров они хранятся в специальной теме.
РЕДАКТИРОВАТЬ
Согласно комментарию ниже, kafka-delete-records.sh
инструмент командной строки может быть использован.
Обратите внимание, что это использует Scala AdminClient
который не находится на пути к классам SCSt по умолчанию (начиная с 2.0).
Тем не менее, Java AdminClient
поддерживает аналогичные функции:
/**
* Delete records whose offset is smaller than the given offset of the corresponding partition.
*
* This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
* @return The DeleteRecordsResult.
*/
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}
Вы можете создать AdminClient
используя ботинки AutoConfiguration
KafkaAdmin
,
AdminClient client = AdminClient.create(kafkaAdmin.getConfig());