Kafka-streams: установка политики очистки внутренних тем для удаления не работает
Я использую функцию уменьшения потоков kafka, и она создает некоторую внутреннюю тему журнала изменений хранилища состояний (например, app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog).
Я хотел установить байты хранения и изменить политику очистки для удаления, чтобы предотвратить переполнение хранилища. Поэтому я установил следующие конфиги в коде потоков kafka:
Properties props = new Properties();
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.RETENTION_BYTES_CONFIG, Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Однако при создании нового раздела к вновь созданному внутреннему разделу применяется только конфигурация хранения, а политика очистки остается компактной.
Есть ли пропущенный шаг для этого? (или нельзя ли установить политику очистки внутренних тем для удаления?)
Я использую kafka версии 1.0.0 и kafka-streams версии 1.0.0
1 ответ
Спасибо Гожану за его ответ в списке рассылки kafka:
Описанная вами проблема выглядит как старая ошибка, исправленная начиная с версии 1.1.0 (как часть исправления в https://jira.apache.org/jira/browse/KAFKA-6150).
... Вам не нужно обновлять брокер, чтобы использовать более новые версии библиотеки Streams.
Обновление версии kafka-streams до 1.1.0 решило проблему.