Исключение NullPointerException при подключении к брокеру Kafka с помощью SASL/SCRAM
Мы настроили в приложении два брокера Kafka: YAML, один с SASL KERBEROS, а другой с SASL SCRAM. При запуске службы он подключается к брокеру с помощью SASL KERBEROS и получает ошибку ниже для другого брокера (SASL SCRAM). Когда мы подключаемся к одному брокеру с SALS SCRAM в приложении YAML, он подключается без ошибок
================================================== ============================================ Установите для состояния клиента SASL значение RECEIVE_APIVERSIONS_RESPONSE main] oakcsaSaslClientAuthenticator Установить состояние клиента SASL на SEND_HANDSHAKE_REQUEST main] oakcsaSaslClientAuthenticator Установить состояние клиента SASL на RECEIVE_HANDSHAKE_RESPONSE main] oakcsaSaslClientAuthenticator. закрытие соединения
java.lang.NullPointerException: значение null в org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:389) в org.apache.kafka.common.security.ClientAhenticator.Anticator.Sasl :296) по адресу org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:237)
Application.YAML
binders:
binder1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
replication-factor: 1
brokers: ${eventhub.broker.hosts2}
zkNodes: ${eventhub.zookeper.hosts2}
configuration:
security:
protocol: SASL_SSL
sasl:
mechanism: GSSAPI
ssl:
truststore:
location: ${eventhub.broker.cert.location2}
password: ${eventhub.broker.cert.password2}
jaas:
options:
useKeyTab: true
storeKey: true
keyTab: /scratch/kafka/kafka2/krb5.keytab
serviceName: kafka
principal: kafka/XXXXXXXXXXXXXXXX.COM
default:
consumer:
autoCommitOffset: false
binder2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ${eventhub.broker.hosts} # 10.40.158.93:9093
zkNodes: ${eventhub.zookeper.hosts} #10.40.158.93:2181
autoCreateTopics: false
zkConnectionTimeout: 36000
headers:
- event
- sourceSystem
- userId
- branchCode
- kafka_messageKey
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: ${eventhub.broker.user}
password: ${eventhub.broker.password}
configuration:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-256
ssl:
enabled:
truststore:
location: ${eventhub.broker.cert.location}
password: ${eventhub.broker.cert.password}
1 ответ
Вместо того, чтобы полагаться на настройку
JAAS
конфигурацию через переплет или настройку
java.security.auth.login.config
свойство, когда у вас есть несколько кластеров с разными контекстами безопасности в одном приложении, вам необходимо использовать подходы, упомянутые в KIP-85. По сути, вам нужно установить свойство, которое имеет приоритет над другими методами. Используя
sasl.jaas.config
, вы можете переопределить ограничения, наложенные JVM, в которых используется статический контекст безопасности для всей JVM, игнорируя, таким образом, любые последующие конфигурации JAAS, найденные после первой.
Вот пример приложения, которое демонстрирует, как подключиться к нескольким кластерам Kafka с разными контекстами безопасности в качестве приложения с несколькими связывателями.