Не удалось создать производителя Kafka при отправке списка строк.

Я новичок в Kafka и пытаюсь прочитать текстовый файл и создать список строк, которые хочу отправить потребителям. Я использую Java 21 и Spring Boot 3.2.0 (SNAPSHOT).

Это конфигурация производителя Kafka:

      @Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());

        return properties;
    }

    @Bean
    public ProducerFactory<String, List<String>> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

Это конфигурация темы:

      @Configuration
public class KafkaTopicConfig {
    
    @Bean
    public NewTopic alexTopic(){
        return TopicBuilder.name("securityTopic")
                .build();
    }
}

Это «application.properties»:

      spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092

Вот как я получаю файл:

      public Optional uploadFile(MultipartFile file){
    if(file.isEmpty()){
        return Optional.of(new EmptyFileException("File is empty please double check"));
    }
    
    return Optional.of(this.upload.uploadFile(file));
}

Вот как я отправляю «List<String>» в Кафку:

      public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@Override
public String uploadFile(MultipartFile file) {
    try(BufferedReader reader = new BufferedReader(
                                    new InputStreamReader(file.getInputStream()))){

        String line;
        int i=0;
        List<String> list = new ArrayList<>();
        while ((line=reader.readLine())!=null) {
            if(i==10){
                this.kafkaTemplate.send("securityTopic",list);

Если я отправляю чистые строки и использую:

      StringSerializer.class

вместо:

      ListSerializer.class.getName()

в конфигурации производителя Kafka, а также измените другие дженерики на String вместо List<String>. Но как мне тогда отправить List<String>? Я снова использую Kafka в течение 2 или 3 дней и не знаю, нужно ли мне создавать собственный сериализатор или использовать что-то встроенное?

Ошибка, которая мне выдает, такая же, как в заголовке: «Не удалось создать продюсера Кафки».

Я также пытался использовать «ListSerializer.class», но не работает.

Я также пытался использовать «JsonSerialize.class», но ошибка все та же.

Спасибо, что нашли время прочитать это!

0 ответов

Другие вопросы по тегам