Ошибка подключения Hive Metastore во время пакетной потоковой передачи в Spark с использованием apache Iceberg

Ниже приведен код, который я создал для представления TempStreamingDeviceData. Но я получаю исключение, поскольку не могу подключиться к хранилищу метаданных Hive. Я также настроил каталог искр для apache Iceberg.

      def writeStreaming(batchDF: DataFrame, batchID: Long): Unit = {
      batchDF.persist()
      batcDFcreateOrReplaceTempView("TempStreamingDeviceData")
      spark.sql(
        " MERGE INTO local.db.deviceData t USING (SELECT * FROM TempStreamingDeviceData) s ON t.deviceId = s.deviceId WHEN MATCHED THEN UPDATE SET t.platformName = s.platformName,t.deviceType = s.deviceType WHEN NOT MATCHED THEN INSERT (externalId,deviceId,tenantId,name,deviceType,platformName,usr_userIdentifier) VALUES(s.externalId,s.deviceId,s.tenantId,s.name,s.deviceType,s.platformName,s.usr_userIdentifier)"
      )
      batchDF.unpersist()
    }

Ниже приведено исключение

              at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore
        at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
        at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:30)
        at org.apache.iceberg.hive.ClientPool.get(ClientPool.java:117)
        at org.apache.iceberg.hive.ClientPool.run(ClientPool.java:52)
        at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:130)
        at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:86)
        at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:69)
        at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:92)
        at org.apache.iceberg.CachingCatalog$$Lambda$2745/0000000063E75980.apply(Unknown Source)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2337)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$2746/000000006B0ACBE0.apply(Unknown Source)
        at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1864)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2335)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2318)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:111)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
        at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:96)
        at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:452)
        at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:116)
        at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:79)
        at org.apache.iceberg.spark.SparkSessionCatalog.loadTable(SparkSessionCatalog.java:118)
        at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:283)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$lzycompute$1(Analyzer.scala:1010)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$1(Analyzer.scala:1010)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1022)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$$Lambda$2744/000000006AFA9F70.apply(Unknown Source)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1021)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:977)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:962)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$1477/00000000691CFA70.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)

0 ответов

Другие вопросы по тегам