org.apache.kafka.common.KafkaException: не удалось создать потребителя kafka при загрузке данных в Pinot

Я пытаюсь принять данные Kerberous kafka в apache Pinot.

Я могу выполнить тот же процесс, и данные были успешно загружены в мою таблицу Pinot без аутентификации Kerberos, но движение, которое я пытаюсь добавить, добавить дополнительный протокол безопасности и местоположение файла Jaas.config и принять данные из Kerberous kafka, которые он выбрасывает.

          error:"org.apache.kafka.common.KafkaException: Failed to construct kafka consumer".

Я не могу понять, что я сделал не так, а что пропустил. Так что любая помощь будет отличной, потому что я не знаком с аутентификацией Kerberos.

Я даю подробную информацию о моих файлах jaas.conf и pinot table_config.json (где я делаю все эти Kerberous) настройки kafka для получения данных из темы kafka с Kerberous аутентификацией.

          jaas.conf

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    doNotPrompt=true
    keyTab="<keytab path>"
    storeKey=true
    useTicketCache=false
    renewTicket=true
    serviceName="kafka"
    principal="kafka.service@username";
    };

    Pinot_table_Cofig.Json

  {
  "tableName": "transcript",
  "tableType": "REALTIME",
  "segmentsConfig": {
  "timeColumnName": "timestampInEpoch",
  "timeType": "MILLISECONDS",
  "schemaName": "transcript",
  "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
  "loadMode": "MMAP",
   "streamConfigs": {
   "streamType": "kafka",
   "security.protocal":"SASL_PLAINTEXT"
   "Djava.security.auth.login.config": "jaas.config file path"
   "stream.kafka.consumer.type": "lowlevel",
   "stream.kafka.topic.name": "kafka_-topic-name",
   "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
  "stream.kafka.consumer.factory.class.name":    "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
  "stream.kafka.broker.list": "kerberous kafka server details(localhost:9092)"
  "realtime.segment.flush.threshold.size": "0",
  "realtime.segment.flush.threshold.time": "24h",
 "realtime.segment.flush.desired.size": "50M",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
   }
   },
  "metadata": {
  "customConfigs": {}
  }
  }

Я также пробовал stream.kafka.security.protocal вместо security.protocal, но его время ожидания истекло при получении ошибки метаданных темы для stream.kafka.security.protocal. Может кто-нибудь, пожалуйста, помогите мне.

Спасибо большое за вашу поддержку.

PS Я не пишу какой-либо код java или scala для этого, чтобы выполнить это непосредственно на данных терминала, уже доступных в теме Kafka, просто предоставляя здесь все подробности и используя данные тем Kafka. Если я выполняю ту же конфигурацию после удаления jaas.config и security.protocol, то то же самое отлично работает на не Kerberos Kafka. Он выдает только ошибку, когда я делаю на Kerberos Kafka с деталями аутентификации.

0 ответов

Другие вопросы по тегам