Не удалось создать производителя Kafka при отправке списка строк.
Я новичок в Kafka и пытаюсь прочитать текстовый файл и создать список строк, которые хочу отправить потребителям. Я использую Java 21 и Spring Boot 3.2.0 (SNAPSHOT).
Это конфигурация производителя Kafka:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig(){
Map<String,Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
return properties;
}
@Bean
public ProducerFactory<String, List<String>> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
return new KafkaTemplate<>(producerFactory);
}
}
Это конфигурация темы:
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic alexTopic(){
return TopicBuilder.name("securityTopic")
.build();
}
}
Это «application.properties»:
spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092
Вот как я получаю файл:
public Optional uploadFile(MultipartFile file){
if(file.isEmpty()){
return Optional.of(new EmptyFileException("File is empty please double check"));
}
return Optional.of(this.upload.uploadFile(file));
}
Вот как я отправляю «List<String>» в Кафку:
public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public String uploadFile(MultipartFile file) {
try(BufferedReader reader = new BufferedReader(
new InputStreamReader(file.getInputStream()))){
String line;
int i=0;
List<String> list = new ArrayList<>();
while ((line=reader.readLine())!=null) {
if(i==10){
this.kafkaTemplate.send("securityTopic",list);
Если я отправляю чистые строки и использую:
StringSerializer.class
вместо:
ListSerializer.class.getName()
в конфигурации производителя Kafka, а также измените другие дженерики на String вместо List<String>. Но как мне тогда отправить List<String>? Я снова использую Kafka в течение 2 или 3 дней и не знаю, нужно ли мне создавать собственный сериализатор или использовать что-то встроенное?
Ошибка, которая мне выдает, такая же, как в заголовке: «Не удалось создать продюсера Кафки».
Я также пытался использовать «ListSerializer.class», но не работает.
Я также пытался использовать «JsonSerialize.class», но ошибка все та же.
Спасибо, что нашли время прочитать это!