Ошибка подключения Hive Metastore во время пакетной потоковой передачи в Spark с использованием apache Iceberg
Ниже приведен код, который я создал для представления TempStreamingDeviceData. Но я получаю исключение, поскольку не могу подключиться к хранилищу метаданных Hive. Я также настроил каталог искр для apache Iceberg.
def writeStreaming(batchDF: DataFrame, batchID: Long): Unit = {
batchDF.persist()
batcDFcreateOrReplaceTempView("TempStreamingDeviceData")
spark.sql(
" MERGE INTO local.db.deviceData t USING (SELECT * FROM TempStreamingDeviceData) s ON t.deviceId = s.deviceId WHEN MATCHED THEN UPDATE SET t.platformName = s.platformName,t.deviceType = s.deviceType WHEN NOT MATCHED THEN INSERT (externalId,deviceId,tenantId,name,deviceType,platformName,usr_userIdentifier) VALUES(s.externalId,s.deviceId,s.tenantId,s.name,s.deviceType,s.platformName,s.usr_userIdentifier)"
)
batchDF.unpersist()
}
Ниже приведено исключение
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:30)
at org.apache.iceberg.hive.ClientPool.get(ClientPool.java:117)
at org.apache.iceberg.hive.ClientPool.run(ClientPool.java:52)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:130)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:86)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:69)
at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:92)
at org.apache.iceberg.CachingCatalog$$Lambda$2745/0000000063E75980.apply(Unknown Source)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2337)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$2746/000000006B0ACBE0.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1864)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2335)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2318)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:111)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:96)
at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:452)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:116)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:79)
at org.apache.iceberg.spark.SparkSessionCatalog.loadTable(SparkSessionCatalog.java:118)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:283)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$lzycompute$1(Analyzer.scala:1010)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$1(Analyzer.scala:1010)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1022)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$$Lambda$2744/000000006AFA9F70.apply(Unknown Source)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1021)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:977)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:962)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$1477/00000000691CFA70.apply(Unknown Source)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)