org.apache.kafka.common.KafkaException: не удалось создать потребителя kafka при загрузке данных в Pinot
Я пытаюсь принять данные Kerberous kafka в apache Pinot.
Я могу выполнить тот же процесс, и данные были успешно загружены в мою таблицу Pinot без аутентификации Kerberos, но движение, которое я пытаюсь добавить, добавить дополнительный протокол безопасности и местоположение файла Jaas.config и принять данные из Kerberous kafka, которые он выбрасывает.
error:"org.apache.kafka.common.KafkaException: Failed to construct kafka consumer".
Я не могу понять, что я сделал не так, а что пропустил. Так что любая помощь будет отличной, потому что я не знаком с аутентификацией Kerberos.
Я даю подробную информацию о моих файлах jaas.conf и pinot table_config.json (где я делаю все эти Kerberous) настройки kafka для получения данных из темы kafka с Kerberous аутентификацией.
jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
doNotPrompt=true
keyTab="<keytab path>"
storeKey=true
useTicketCache=false
renewTicket=true
serviceName="kafka"
principal="kafka.service@username";
};
Pinot_table_Cofig.Json
{
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestampInEpoch",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"security.protocal":"SASL_PLAINTEXT"
"Djava.security.auth.login.config": "jaas.config file path"
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "kafka_-topic-name",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kerberous kafka server details(localhost:9092)"
"realtime.segment.flush.threshold.size": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.desired.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}
Я также пробовал stream.kafka.security.protocal вместо security.protocal, но его время ожидания истекло при получении ошибки метаданных темы для stream.kafka.security.protocal. Может кто-нибудь, пожалуйста, помогите мне.
Спасибо большое за вашу поддержку.
PS Я не пишу какой-либо код java или scala для этого, чтобы выполнить это непосредственно на данных терминала, уже доступных в теме Kafka, просто предоставляя здесь все подробности и используя данные тем Kafka. Если я выполняю ту же конфигурацию после удаления jaas.config и security.protocol, то то же самое отлично работает на не Kerberos Kafka. Он выдает только ошибку, когда я делаю на Kerberos Kafka с деталями аутентификации.