SparkSQL - 滞后功能?
SparkSQL - Lag function?
我在这个 DataBricks post 中看到,SparkSql 中支持 window 函数,特别是我正在尝试使用 lag() window 函数。
我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易金额,以及当前行金额与前一行的数量。
根据 DataBricks post,我想出了这个查询,但它向我抛出一个异常,我不太明白为什么..
这是在 PySpark 中。tx 是我的数据框,已在注册为临时 table。
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
和异常(截断)..
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
我真的很感激任何见解,这个功能是相当新的,就现有示例或其他相关 post 而言,没有太多可以继续的地方。
编辑
我也尝试过在没有 SQL 语句的情况下执行此操作,如下所示,但仍然出现错误。我已经将它与 Hive 和 SQLContext 一起使用,并收到相同的错误。
windowSpec = \
Window \
.partitionBy(h_tx_df_ordered['cc_num']) \
.orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = \
(lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
tx_df_ordered.select(
h_tx_df_ordered['cc_num'],
h_tx_df_ordered['trans_date'],
h_tx_df_ordered['trans_time'],
h_tx_df_ordered['amt'],
lag_amt.alias("prev_amt")).show()
Traceback (most recent call last):
File "rdd_raw_data.py", line 116, in <module>
lag_amt.alias("prev_amt")).show()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
- 框架规范应以关键字
ROWS
而非 ROW
开头
帧规范需要下限值
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
或UNBOUNDED
关键字
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
LAG
函数根本不接受帧,因此正确的 SQL 延迟查询可能如下所示
SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time
) as prev_amt from tx
编辑:
关于 SQL DSL 用法:
正如您在错误消息中看到的那样
Note that, using window functions currently requires a HiveContex
务必使用 HiveContext
而非 SQLContext
初始化 sqlContext
windowSpec.rowsBetween(-1, 0)
什么都不做,但 lag
函数再次不支持帧规范。
我在这个 DataBricks post 中看到,SparkSql 中支持 window 函数,特别是我正在尝试使用 lag() window 函数。
我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易金额,以及当前行金额与前一行的数量。
根据 DataBricks post,我想出了这个查询,但它向我抛出一个异常,我不太明白为什么..
这是在 PySpark 中。tx 是我的数据框,已在注册为临时 table。
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
和异常(截断)..
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
我真的很感激任何见解,这个功能是相当新的,就现有示例或其他相关 post 而言,没有太多可以继续的地方。
编辑
我也尝试过在没有 SQL 语句的情况下执行此操作,如下所示,但仍然出现错误。我已经将它与 Hive 和 SQLContext 一起使用,并收到相同的错误。
windowSpec = \
Window \
.partitionBy(h_tx_df_ordered['cc_num']) \
.orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = \
(lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
tx_df_ordered.select(
h_tx_df_ordered['cc_num'],
h_tx_df_ordered['trans_date'],
h_tx_df_ordered['trans_time'],
h_tx_df_ordered['amt'],
lag_amt.alias("prev_amt")).show()
Traceback (most recent call last):
File "rdd_raw_data.py", line 116, in <module>
lag_amt.alias("prev_amt")).show()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
- 框架规范应以关键字
ROWS
而非ROW
开头
帧规范需要下限值
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
或
UNBOUNDED
关键字ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
LAG
函数根本不接受帧,因此正确的 SQL 延迟查询可能如下所示SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER ( PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ) as prev_amt from tx
编辑:
关于 SQL DSL 用法:
正如您在错误消息中看到的那样
Note that, using window functions currently requires a HiveContex
务必使用
HiveContext
而非SQLContext
初始化 windowSpec.rowsBetween(-1, 0)
什么都不做,但lag
函数再次不支持帧规范。
sqlContext