Удалить логи кафки для использованных сообщений, используя SCS

Новичок в использовании кафки и весеннего облачного потока. Нужна помощь.

Настроить

  • У меня есть два подпружиненных приложения App-1, App-2.
  • Я использую Spring Cloud Stream и Spring-Cloud-Stream-Binder-Kafka для асинхронной связи.
  • Есть одна тема ТЕМА-1


Случай использования

  • Предположим, приложение-1 отправило сообщение по теме TOPIC-1, которое приложение-2 слушало.
  • Приложение-2 приняло сообщение и успешно обработало его.
  • Теперь смещение этой темы увеличивается.

Вопрос

  • Как реализовать механизм удаления только успешно использованных данных сообщения из журналов kafka по истечении заданного периода времени?

В Кафке ответственность за то, что было потреблено, является ответственностью потребителя. Итак, я думаю, в весеннем облачном потоке kafka должен быть какой-то механизм управления журналом сообщений kafka, о котором я не знаю.


ПРИМЕЧАНИЕ 1. Я знаю о времени хранения журнала kafka и свойствах диска. Но логи кафки будут удалены даже для неиспользованных сообщений.

ПРИМЕЧАНИЕ 2: я прошел через этот вопрос, но это не может помочь.

1 ответ

Решение

В Кафке нет такого механизма, о котором я знаю; и, конечно, не в Spring Cloud Stream или библиотеках, на которых он основан. Клиенты Kafka не имеют доступа к таким низкоуровневым конструкциям.

Кроме того, смещения потребителей полностью отделены от журналов тем; у современных брокеров они хранятся в специальной теме.

РЕДАКТИРОВАТЬ

Согласно комментарию ниже, kafka-delete-records.sh инструмент командной строки может быть использован.

Обратите внимание, что это использует Scala AdminClient который не находится на пути к классам SCSt по умолчанию (начиная с 2.0).

Тем не менее, Java AdminClient поддерживает аналогичные функции:

/**
 * Delete records whose offset is smaller than the given offset of the corresponding partition.
 *
 * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
 * See the overload for more details.
 *
 * This operation is supported by brokers with version 0.11.0.0 or higher.
 *
 * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
 * @return                      The DeleteRecordsResult.
 */
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
    return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}

Вы можете создать AdminClient используя ботинки AutoConfigurationKafkaAdmin,

AdminClient client = AdminClient.create(kafkaAdmin.getConfig());
Другие вопросы по тегам