Исключение NullPointerException при подключении к брокеру Kafka с помощью SASL/SCRAM

Мы настроили в приложении два брокера Kafka: YAML, один с SASL KERBEROS, а другой с SASL SCRAM. При запуске службы он подключается к брокеру с помощью SASL KERBEROS и получает ошибку ниже для другого брокера (SASL SCRAM). Когда мы подключаемся к одному брокеру с SALS SCRAM в приложении YAML, он подключается без ошибок

================================================== ============================================ Установите для состояния клиента SASL значение RECEIVE_APIVERSIONS_RESPONSE main] oakcsaSaslClientAuthenticator Установить состояние клиента SASL на SEND_HANDSHAKE_REQUEST main] oakcsaSaslClientAuthenticator Установить состояние клиента SASL на RECEIVE_HANDSHAKE_RESPONSE main] oakcsaSaslClientAuthenticator. закрытие соединения

java.lang.NullPointerException: значение null в org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:389) в org.apache.kafka.common.security.ClientAhenticator.Anticator.Sasl :296) по адресу org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:237)

Application.YAML

        binders:
   binder1:
    type: kafka
    environment:
     spring:
      cloud:
       stream:
        kafka:
         binder:
          replication-factor: 1
          brokers: ${eventhub.broker.hosts2}
          zkNodes: ${eventhub.zookeper.hosts2}
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: GSSAPI
            ssl:
              truststore:
                location: ${eventhub.broker.cert.location2}
                password: ${eventhub.broker.cert.password2}

          jaas:
           options:
            useKeyTab: true
            storeKey: true
            keyTab: /scratch/kafka/kafka2/krb5.keytab
            serviceName: kafka
            principal: kafka/XXXXXXXXXXXXXXXX.COM
         default:
          consumer:
           autoCommitOffset: false

   binder2:
    type: kafka
    environment:
     spring:
      cloud:
       stream:
        kafka:
         binder:
          brokers: ${eventhub.broker.hosts} # 10.40.158.93:9093
          zkNodes: ${eventhub.zookeper.hosts} #10.40.158.93:2181
          autoCreateTopics: false
          zkConnectionTimeout: 36000
          headers: 
           - event
           - sourceSystem
           - userId
           - branchCode
           - kafka_messageKey
          jaas:
            loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
            options:
              username: ${eventhub.broker.user}
              password: ${eventhub.broker.password}
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256
            ssl:
              enabled:
              truststore:
                location: ${eventhub.broker.cert.location}
                password: ${eventhub.broker.cert.password}

1 ответ

Вместо того, чтобы полагаться на настройку JAAS конфигурацию через переплет или настройку java.security.auth.login.configсвойство, когда у вас есть несколько кластеров с разными контекстами безопасности в одном приложении, вам необходимо использовать подходы, упомянутые в KIP-85. По сути, вам нужно установить свойство, которое имеет приоритет над другими методами. Используя sasl.jaas.config, вы можете переопределить ограничения, наложенные JVM, в которых используется статический контекст безопасности для всей JVM, игнорируя, таким образом, любые последующие конфигурации JAAS, найденные после первой.

Вот пример приложения, которое демонстрирует, как подключиться к нескольким кластерам Kafka с разными контекстами безопасности в качестве приложения с несколькими связывателями.

Другие вопросы по тегам