кафка рассчитан на то, чтобы иметь много производителей

Мы тестировали kafka (облачную) для реализации системы передачи данных многих производителей (примерно 27000 Linux-машин), потребителя (spring kafka listener) и темы с 10 разделами, проблема в том, что когда 9500 производителей передают одновременно потребление ЦП всех узлов достигает 100%, и кластер отключается и перестает отвечать. Эта кафка предназначена для этого типа архитектуры, или вам стоит поискать другие варианты.

Вот мои настройки:

Кластер Kafka:4 узла на основе облачной карафки (4 ГБ оперативной памяти + 900 ГБ диск) на узел

Производитель:kafka-clients-1.1.1.jar + JDK 1.7

                  Properties config = new Properties();
            config.put("bootstrap.servers", "***");
            config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            config.put("security.protocol", "SASL_SSL");
            config.put("sasl.mechanism", "SCRAM-SHA-256");
            config.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");        
            config.put("delivery.timeout.ms", 0);        
            config.put("transactional.id", UUID.randomUUID().toString());
            config.put("enable.idempotence", true);
            config.put("compression.type", "gzip");
    
            try ( KafkaProducer<String, String> producer = new KafkaProducer<>(config)) {
                String dataJson = "700 bytes json String";
                ProducerRecord<String, String> data = new ProducerRecord<>("test-topic", dataJson);
                
                producer.initTransactions();
                try {
                    log.info("behin " + data);
                    producer.beginTransaction();
                    producer.send(data);
                    producer.commitTransaction();
                    log.info("commit " + data);
                    config = null;
                    producer.close();
                } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                    log.error("kafka exception"+e);
                    producer.close();
                } catch (KafkaException e) {
                    log.error("kafka exception"+e);
                    log.info("ABORT");
                    producer.abortTransaction();
                }
            } finally {
                 log.info("finally");
            }

Конфигурация потребителя загрузки Spring:

      spring:
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: oracle.jdbc.OracleDriver
    url: jdbc:oracle:thin:@x.x.x.251:1521/xe
    username: username
    password: password
    hikari:      
      maximum-pool-size: 12      
      pool-name: test-pool
  jpa:
    database-platform: org.hibernate.dialect.Oracle10gDialect
    show-sql: true
    hibernate:
      ddl-auto: none    
    properties:
      hibernate:
        default_schema: hr
        type: trace
        format_sql: trace
  kafka:
    bootstrap-servers: ***********
    consumer:
      group-id: group-id
      topic: test-topic
    properties:
      security:
        protocol: SASL_SSL
      sasl:
        mechanism: SCRAM-SHA-256
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
      jass:
        enabled: false
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: io.confluent.kafka.serializers.KafkaJsonSerializer

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void processMessage(String dataJson, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
            @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

        try {            
            log.info("json: "+dataJson);
            service.save(dataJson);
        } catch (SQLException | JsonProcessingException ex) {
           
            service.trackError(dataJson, ex.getLocalizedMessage());
        }

    }

любая помощь будет принята с благодарностью. Спасибо

0 ответов

Другие вопросы по тегам