r sparklyr spark_apply Ошибка: org.apache.spark.sql.AnalysisException: ссылка 'id' является неоднозначной
Я пытаюсь использовать spark_apply на кластере искр для вычисления kmeans на данных, сгруппированных по двум столбцам. Данные запрашиваются из Hive и выглядят так
> samplog1
# Source: lazy query [?? x 6]
# Database: spark_connection
id time1 latitude longitude timestamp hr
<chr> <dbl> <dbl> <dbl> <chr> <int>
1 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545 104.1265 2017-10-30 04:29:59 4
2 fffc7412-deb1-4587-9c22-29ca833865ed 1.509332e+12 5.701320 117.4892 2017-10-30 02:49:47 2
3 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509338e+12 5.334012 100.2172 2017-10-30 04:25:44 4
4 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545 104.1265 2017-10-30 04:29:44 4
5 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509332e+12 5.334061 100.2173 2017-10-30 02:58:30 2
6 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509339e+12 5.334012 100.2172 2017-10-30 04:55:41 4
7 fffc7412-deb1-4587-9c22-29ca833865ed 1.509339e+12 5.729879 117.5787 2017-10-30 04:49:07 4
8 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509340e+12 1.373545 104.1265 2017-10-30 05:02:08 5
9 fffc7412-deb1-4587-9c22-29ca833865ed 1.509325e+12 5.701320 117.4892 2017-10-30 00:53:12 0
10 fffc7412-deb1-4587-9c22-29ca833865ed 1.509336e+12 5.670300 117.4990 2017-10-30 04:08:12 4
Функция, которую я передаю в spark_apply, находится ниже. Предполагается взять группу данных it по id и hr, вычислить kmeans для каждой группы, вычислить, какую долю строк представляет каждая группа (достоверность) и вернуть центр с наибольшим числом членов и доверительностью:
kms <- function(idLogs){
tryCatch({
km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude"))
km1 <- copy_to(sc, km$centers, overwrite = T)
cluster <- sdf_predict(km)
clustCounts <- cluster %>% group_by(prediction) %>%
tally %>%
mutate(conf=n/sum(n),
prediction=prediction+1)
clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0)
clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T)
clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf)
return(data.frame(clustCounts2))
}, error = function(e) {
return(
data.frame(string_id = c(0), string_categories = c("error"))
)
})
}
и я призываю это как
spark_apply(x = samplog1, f = kms, group_by = c("id","hr"))
Тем не менее, я получаю сообщение об ошибке по поводу неоднозначного столбца 'id'.
Error: org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#1569, id#1571.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:171)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4$$anonfun$26.apply(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4$$anonfun$26.apply(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:466)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:466)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:346)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:346)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:327)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2141)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:721)
at org.apache.spark.sql.DataFrame.selectExpr(DataFrame.scala:754)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToM
Из объяснений, которые я видел, это могло произойти при соединении фреймов данных, где используется общий идентификатор. В этом случае я не присоединяюсь ни к одному фрейму данных. Единственным возможным виновником была бы функция слияния, но составляющие кадры данных не имеют столбца id. Я новичок в sparklyr и spark_apply, поэтому ценю, что мог написать свою функцию совершенно неправильно. Я публикую весь сценарий ниже на случай, если он может выявить другие проблемы. Я надеюсь, что это не загромождает вещи:
Sys.setenv(HIVE_HOME="/opt/cloudera/parcels/CDH/lib/hive/")
kms <- function(idLogs){
tryCatch({
km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude"))
km1 <- copy_to(sc, km$centers, overwrite = T)
cluster <- sdf_predict(km)
clustCounts <- cluster %>% group_by(prediction) %>%
tally %>%
mutate(conf=n/sum(n),
prediction=prediction+1)
clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0)
clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T)
clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf)
return(data.frame(clustCounts2))
}, error = function(e) {
return(
data.frame(string_id = c(0), string_categories = c("error"))
)
})
}
sc <- spark_connect(master = "yarn-client",
version="1.6.0",
spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
tbl_change_db(sc, "clustergps")
samplog <- tbl(sc, "part6")
samplog <- mutate(samplog, timestamp = from_unixtime(time1/1000))
samplog <- mutate(samplog, hr = hour(timestamp))
samplog1 <- samplog %>% filter(id == 'fffd16d5-83f1-4ea1-95de-34b1fcad392b' |
id == 'fffc7412-deb1-4587-9c22-29ca833865ed' |
id == 'fffc68e3-866e-4be5-b1bc-5d21b89622ae')
likelyLocs <- spark_apply(samplog1, kms, group_by = list("id","hr"))
1 ответ
Так что просто чтобы дать некоторые отзывы об этом. Я смог решить эту проблему, установив параметр "columns" в spark_apply, который определяет имена выходных столбцов. Я нашел установку его для любой строки / вектора значения строки сработало.