如何在 COUNT 列中正确使用 SQL HAVING 子句?

How to properly use SQL HAVING clause with a COUNT column?

我有以下 table.

|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|ujmEBvifdJM6h6RLv...|   0|2013-05-07 04:34:36|    1|Q1sbwvVQXV2734tPg...|  1.0|Total bill for th...|     6|hG7b0MtEbXx5QzbzE...|
|NZnhc2sEQy3RmzKTZ...|   0|2017-01-14 21:30:33|    0|GJXCdrto3ASJOqKeV...|  5.0|I *adore* Travis ...|     0|yXQM5uF2jS6es16SJ...|
|WTqjgwHlXbSFevF32...|   0|2016-11-09 20:09:03|    0|2TzJjDVDEuAW6MR5V...|  5.0|I have to say tha...|     3|n6-Gk65cPZL6Uz8qR...|

我正在尝试
- 计算每个文本的文本和用户 ID(应该给我文本的出现次数) - 按文本分组
- 平均 'cool' 因子
- 获取所有计数高于 5
的结果条目 - 并按平均 'cool' 因子

对项目进行排序(排序)

我一直收到错误消息。由于这是在 pyspark 中,因此有点难以阅读。但是,我觉得我的 SQL 技能是问题所在。该代码在没有 HAVING COUNT(text,user_id) > 5 行的情况下运行。这就是我认为问题所在。任何建议都会很棒!谢谢。

query = """    
SELECT 
        text, 
        COUNT(text,user_id) AS unique_count , 
        AVG(cool) AS avg_cool 
    FROM review
    GROUP BY text
    HAVING COUNT(text,user_id) > 5
    ORDER BY AVG(cool) DESC
"""
spark.sql(query).show(50)


期望的输出

+--------------------+------------+--------+
|                text|unique_count|avg_cool|
+--------------------+------------+--------+
|In retrospect, I ...|          16|   506.0|
|The Wynn Hotel. O...|          14|   347.0|
|Nutter Butter, Ch...|          12|   290.0|
|It's perky! It's ...|          10|   268.0|

pyspark 错误

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.sql.AnalysisException: cannot resolve '`cool`' given input columns: [review.text, unique_count, avg_cool]; line 9 pos 17;
'Sort ['AVG('cool) DESC NULLS LAST], true
+- Project [text#190, unique_count#386L, avg_cool#387]
   +- Filter (count(text#190, user_id#192)#392L > cast(5 as bigint))
      +- Aggregate [text#190], [text#190, count(text#190, user_id#192) AS unique_count#386L, avg(cool#185L) AS avg_cool#387, count(text#190, user_id#192) AS count(text#190, user_id#192)#392L]
         +- SubqueryAlias `review`
            +- Relation[business_id#184,cool#185L,date#186,funny#187L,review_id#188,stars#189,text#190,useful#191L,user_id#192] json

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:111)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:108)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild(TreeNode.scala:297)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:356)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:356)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:328)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:93)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:93)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:105)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:105)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:116)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:121)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:121)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:93)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:108)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:86)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck.apply(Analyzer.scala:108)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck.apply(Analyzer.scala:105)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-13-10aa09933a72> in <module>
      9     ORDER BY AVG(cool) DESC
     10 """
---> 11 spark.sql(query).show(50)

/usr/local/spark/python/pyspark/sql/session.py in sql(self, sqlQuery)
    765         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    766         """
--> 767         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    768 
    769     @since(2.0)

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve '`cool`' given input columns: [review.text, unique_count, avg_cool]; line 9 pos 17;\n'Sort ['AVG('cool) DESC NULLS LAST], true\n+- Project [text#190, unique_count#386L, avg_cool#387]\n   +- Filter (count(text#190, user_id#192)#392L > cast(5 as bigint))\n      +- Aggregate [text#190], [text#190, count(text#190, user_id#192) AS unique_count#386L, avg(cool#185L) AS avg_cool#387, count(text#190, user_id#192) AS count(text#190, user_id#192)#392L]\n         +- SubqueryAlias `review`\n            +- Relation[business_id#184,cool#185L,date#186,funny#187L,review_id#188,stars#189,text#190,useful#191L,user_id#192] json\n"

我不确定它是 having 子句还是 order by。我知道有些数据库(例如 Hive)对表达式和其中的别名很挑剔。

尝试使用列别名:

SELECT text,  COUNT(text,user_id) AS unique_count, AVG(cool) AS avg_cool 
FROM review
GROUP BY text
HAVING unique_count > 5
ORDER BY avg_cool DESC;