r sparklyr spark_apply Error: org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous

r sparklyr spark_apply Error: org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous

我正在尝试在 spark 集群上使用 spark_apply 来计算按两列分组的数据的 kmeans。从 Hive 查询数据,看起来像这样

> samplog1
# Source:   lazy query [?? x 6]
# Database: spark_connection
                                     id        time1 latitude longitude           timestamp    hr
                                  <chr>        <dbl>    <dbl>     <dbl>               <chr> <int>
 1 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545  104.1265 2017-10-30 04:29:59     4
 2 fffc7412-deb1-4587-9c22-29ca833865ed 1.509332e+12 5.701320  117.4892 2017-10-30 02:49:47     2
 3 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509338e+12 5.334012  100.2172 2017-10-30 04:25:44     4
 4 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545  104.1265 2017-10-30 04:29:44     4
 5 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509332e+12 5.334061  100.2173 2017-10-30 02:58:30     2
 6 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509339e+12 5.334012  100.2172 2017-10-30 04:55:41     4
 7 fffc7412-deb1-4587-9c22-29ca833865ed 1.509339e+12 5.729879  117.5787 2017-10-30 04:49:07     4
 8 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509340e+12 1.373545  104.1265 2017-10-30 05:02:08     5
 9 fffc7412-deb1-4587-9c22-29ca833865ed 1.509325e+12 5.701320  117.4892 2017-10-30 00:53:12     0
10 fffc7412-deb1-4587-9c22-29ca833865ed 1.509336e+12 5.670300  117.4990 2017-10-30 04:08:12     4

我传递给 spark_apply 的函数如下。它应该按 id 和 hr 对数据进行分组,计算每组的 kmeans,计算每组代表的行数(置信度)和 return 成员数量最多的中心和置信度:

kms <- function(idLogs){
  tryCatch({
  km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude"))

  km1 <- copy_to(sc, km$centers, overwrite = T)

  cluster <-  sdf_predict(km)

  clustCounts <- cluster %>% group_by(prediction) %>% 
    tally  %>%
    mutate(conf=n/sum(n),
           prediction=prediction+1)

  clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0)

  clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T)

  clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf)

  return(data.frame(clustCounts2))
  }, error = function(e) {
    return(
      data.frame(string_id = c(0), string_categories = c("error"))
    )
  })
}

我将其调用为

spark_apply(x = samplog1, f = kms, group_by = c("id","hr"))

但是,我收到关于不明确 'id' 列的错误。

Error: org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#1569, id#1571.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:171)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse$$anonfun.apply(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse$$anonfun.apply(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse.applyOrElse(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$$anonfun$applyOrElse.applyOrElse(Analyzer.scala:466)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:281)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp(QueryPlan.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:127)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply.applyOrElse(Analyzer.scala:466)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply.applyOrElse(Analyzer.scala:346)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:346)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:327)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2141)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:721)
at org.apache.spark.sql.DataFrame.selectExpr(DataFrame.scala:754)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToM

根据我所看到的解释,在加入共享 id 的数据帧时可能会发生这种情况。在这种情况下,我没有加入任何数据框。唯一可能的罪魁祸首是合并函数,但组成数据框没有任何 id 列。我是 sparklyr 和 spark_apply 的新手,非常感谢我可能完全错误地编写了我的函数。我在下面发布了整个脚本,以防它可能揭示其他问题。我希望这不会让事情变得混乱:

Sys.setenv(HIVE_HOME="/opt/cloudera/parcels/CDH/lib/hive/")

kms <- function(idLogs){
  tryCatch({
  km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude"))

  km1 <- copy_to(sc, km$centers, overwrite = T)

  cluster <-  sdf_predict(km)

  clustCounts <- cluster %>% group_by(prediction) %>% 
    tally  %>%
    mutate(conf=n/sum(n),
           prediction=prediction+1)

  clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0)

  clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T)

  clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf)

  return(data.frame(clustCounts2))
  }, error = function(e) {
    return(
      data.frame(string_id = c(0), string_categories = c("error"))
    )
  })
}

sc <- spark_connect(master = "yarn-client", 
                    version="1.6.0", 
                    spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')

tbl_change_db(sc, "clustergps")

samplog <- tbl(sc, "part6")

samplog <- mutate(samplog, timestamp = from_unixtime(time1/1000))

samplog <- mutate(samplog, hr = hour(timestamp))

samplog1 <- samplog %>% filter(id == 'fffd16d5-83f1-4ea1-95de-34b1fcad392b' |
                   id == 'fffc7412-deb1-4587-9c22-29ca833865ed' |
                   id == 'fffc68e3-866e-4be5-b1bc-5d21b89622ae')


likelyLocs <- spark_apply(samplog1, kms, group_by = list("id","hr"))

所以只是想就此提供一些反馈。我能够通过设置定义输出列名称的 spark_apply 的 "columns" 参数来解决该问题。我发现将其设置为任何 string/vector 个字符串值都有效。