кафка рассчитан на то, чтобы иметь много производителей
Мы тестировали kafka (облачную) для реализации системы передачи данных многих производителей (примерно 27000 Linux-машин), потребителя (spring kafka listener) и темы с 10 разделами, проблема в том, что когда 9500 производителей передают одновременно потребление ЦП всех узлов достигает 100%, и кластер отключается и перестает отвечать. Эта кафка предназначена для этого типа архитектуры, или вам стоит поискать другие варианты.
Вот мои настройки:
Кластер Kafka:4 узла на основе облачной карафки (4 ГБ оперативной памяти + 900 ГБ диск) на узел
Производитель:kafka-clients-1.1.1.jar + JDK 1.7
Properties config = new Properties();
config.put("bootstrap.servers", "***");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "SCRAM-SHA-256");
config.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
config.put("delivery.timeout.ms", 0);
config.put("transactional.id", UUID.randomUUID().toString());
config.put("enable.idempotence", true);
config.put("compression.type", "gzip");
try ( KafkaProducer<String, String> producer = new KafkaProducer<>(config)) {
String dataJson = "700 bytes json String";
ProducerRecord<String, String> data = new ProducerRecord<>("test-topic", dataJson);
producer.initTransactions();
try {
log.info("behin " + data);
producer.beginTransaction();
producer.send(data);
producer.commitTransaction();
log.info("commit " + data);
config = null;
producer.close();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
log.error("kafka exception"+e);
producer.close();
} catch (KafkaException e) {
log.error("kafka exception"+e);
log.info("ABORT");
producer.abortTransaction();
}
} finally {
log.info("finally");
}
Конфигурация потребителя загрузки Spring:
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: oracle.jdbc.OracleDriver
url: jdbc:oracle:thin:@x.x.x.251:1521/xe
username: username
password: password
hikari:
maximum-pool-size: 12
pool-name: test-pool
jpa:
database-platform: org.hibernate.dialect.Oracle10gDialect
show-sql: true
hibernate:
ddl-auto: none
properties:
hibernate:
default_schema: hr
type: trace
format_sql: trace
kafka:
bootstrap-servers: ***********
consumer:
group-id: group-id
topic: test-topic
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-256
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
jass:
enabled: false
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: io.confluent.kafka.serializers.KafkaJsonSerializer
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void processMessage(String dataJson, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
try {
log.info("json: "+dataJson);
service.save(dataJson);
} catch (SQLException | JsonProcessingException ex) {
service.trackError(dataJson, ex.getLocalizedMessage());
}
}
любая помощь будет принята с благодарностью. Спасибо