Я пытаюсь создать продюсера для AWS MSK с помощью приложения Springboot, способного создать его из клиента EC2 (с помощью kafka-console-producer.sh)
При выводе сообщения в мск (кафка 2.1.0) получаю
"Исключение выдается при отправке сообщения с
key='null'
а такжеpayload='Message->0'
к теме AWSKafkaTopic"
Я пытаюсь создать его из приложения Springboot, развернутого в EC2 с помощью Docker. Но производитель работает нормально, когда я пытаюсь создать сообщение от того же клиента EC2, используя kafka-console-producer.sh.
bin/kafka-console-producer.sh --broker-list "XXBootstrapBrokerStringTlsXX" --producer.config client.properties --topic AWSKafkaTopic
Я попробовал ту же самую программу на своем локальном компьютере с kafka 2.3.0 и zookeeper, там она работает нормально (работает приложение springboot на докере).
Config->
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
Client->
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String message){
this.kafkaTemplate.send("AWSKafkaTopic",message);
}
Actual result->
ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [XXBootstrapBrokerStringTlsXX]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Журнал:
2019-07-24 07:40:43.305 INFO 1 --- [nio-9000-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2019-07-24 07:40:43.305 INFO 1 --- [nio-9000-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2019-07-24 07:41:43.313 ERROR 1 --- [nio-9000-exec-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='Message->0' to topic AWSKafkaTopic:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
1 ответ
В моем случае я попытался создать сообщение в новой теме, но
false
в брокере aws лучше создать сообщение в существующей теме или
auto.create.topics.enable
установить это свойство как
true
и попробуйте.