为什么 Window 函数(滞后)在 SparkR 中不起作用?
Why are Window functions (Lag) not working in SparkR?
就此而言,我对 Spark 和 SparkR 还很陌生,可能有一些基本问题。
本练习的 objective 是在 SparkR 中实现 window 函数(超前、滞后、排名等)。
我参考了下面的 link 和那里提到的 Databricks post 但无济于事 -
我使用的代码片段:
初始化 sqlContext 并将数据框注册为临时 table 使用
注册温度table
output_data<-SparkR::sql(sqlContext, "select *,lag(type) over(partition by key order by key) as lag_type from input_data")
我们遇到的错误是:
failure: ``union'' expected but `(' found
我发现的另一个建议是使用 Hivecontext 而不是 SQLcontext,因为 SQLcontext 可能不允许所有功能。
在该方法中,初始化 Hivecontext 并尝试 运行 HiveQL
做同样的事情给了我们一个错误说:
cannot find table named input_table
问题: 我们是否需要 运行 一些类似于 registertemptable 的命令以允许 Hivecontext 访问 table?
saveastable 可能是一个选项,但根据我的阅读,它会在 S3 存储中收集数据,而不是在内存中群集。
如有任何帮助,我们将不胜感激!
谢谢!
让我们使用 freeny
数据集准备输入 data.frame
。
ldf <- freeny
# Extract year and quater
ldf$yr <- as.integer(rownames(ldf))
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr))
# Clean column names
colnames(ldf) <- gsub("\.", "_", colnames(ldf))
# Drop a couple of things so output fits nicely in the code box
row.names(ldf) <- NULL
ldf$market_potential <- NULL
head(ldf)
## y lag_quarterly_revenue price_index income_level yr qr
## 1 8.79236 8.79636 4.70997 5.82110 1962 1
## 2 8.79137 8.79236 4.70217 5.82558 1962 2
## 3 8.81486 8.79137 4.68944 5.83112 1962 3
## 4 8.81301 8.81486 4.68558 5.84046 1963 0
## 5 8.90751 8.81301 4.64019 5.85036 1963 1
## 6 8.93673 8.90751 4.62553 5.86464 1963 2
Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.
这是正确的,只有 HiveContext
支持大多数高级功能,而 SQLContext
默认支持。首先,您必须确保您的 Spark 版本已使用 Hive 支持构建。 Spark downloads page 提供的二进制文件确实如此,但如果您从源代码构建,请务必使用 -Phive
标志。
hiveContext <- sparkRHive.init(sc)
sdf <- createDataFrame(hiveContext, ldf)
printSchema(sdf)
## root
## |-- y: double (nullable = true)
## |-- lag_quarterly_revenue: double (nullable = true)
## |-- price_index: double (nullable = true)
## |-- income_level: double (nullable = true)
## |-- yr: integer (nullable = true)
## |-- qr: integer (nullable = true)
initialize sqlContext and register the data frame as a temp table using Registertemptable
也对。为了能够使用 sql
命令,您必须注册一个 table.
registerTempTable(sdf, "sdf")
请记住,DataFrame 绑定到用于创建它的上下文。
head(tables(hiveContext))
## tableName isTemporary
## 1 sdf TRUE
head(tables(sqlContext))
## [1] tableName isTemporary
## <0 rows> (or 0-length row.names)
最后示例查询:
query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag,
LAG(y) OVER (ORDER BY yr, qr) AS new_lag
FROM sdf"
sql(hiveContext, query)
## yr qr y old_lag new_lag
## 1 1962 1 8.79236 8.79636 NA
## 2 1962 2 8.79137 8.79236 8.79236
## 3 1962 3 8.81486 8.79137 8.79137
## 4 1963 0 8.81301 8.81486 8.81486
## 5 1963 1 8.90751 8.81301 8.81301
## 6 1963 2 8.93673 8.90751 8.90751
就此而言,我对 Spark 和 SparkR 还很陌生,可能有一些基本问题。
本练习的 objective 是在 SparkR 中实现 window 函数(超前、滞后、排名等)。
我参考了下面的 link 和那里提到的 Databricks post 但无济于事 -
我使用的代码片段:
初始化 sqlContext 并将数据框注册为临时 table 使用 注册温度table
output_data<-SparkR::sql(sqlContext, "select *,lag(type) over(partition by key order by key) as lag_type from input_data")
我们遇到的错误是:
failure: ``union'' expected but `(' found
我发现的另一个建议是使用 Hivecontext 而不是 SQLcontext,因为 SQLcontext 可能不允许所有功能。
在该方法中,初始化 Hivecontext 并尝试 运行 HiveQL 做同样的事情给了我们一个错误说:
cannot find table named input_table
问题: 我们是否需要 运行 一些类似于 registertemptable 的命令以允许 Hivecontext 访问 table?
saveastable 可能是一个选项,但根据我的阅读,它会在 S3 存储中收集数据,而不是在内存中群集。
如有任何帮助,我们将不胜感激! 谢谢!
让我们使用 freeny
数据集准备输入 data.frame
。
ldf <- freeny
# Extract year and quater
ldf$yr <- as.integer(rownames(ldf))
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr))
# Clean column names
colnames(ldf) <- gsub("\.", "_", colnames(ldf))
# Drop a couple of things so output fits nicely in the code box
row.names(ldf) <- NULL
ldf$market_potential <- NULL
head(ldf)
## y lag_quarterly_revenue price_index income_level yr qr
## 1 8.79236 8.79636 4.70997 5.82110 1962 1
## 2 8.79137 8.79236 4.70217 5.82558 1962 2
## 3 8.81486 8.79137 4.68944 5.83112 1962 3
## 4 8.81301 8.81486 4.68558 5.84046 1963 0
## 5 8.90751 8.81301 4.64019 5.85036 1963 1
## 6 8.93673 8.90751 4.62553 5.86464 1963 2
Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.
这是正确的,只有 HiveContext
支持大多数高级功能,而 SQLContext
默认支持。首先,您必须确保您的 Spark 版本已使用 Hive 支持构建。 Spark downloads page 提供的二进制文件确实如此,但如果您从源代码构建,请务必使用 -Phive
标志。
hiveContext <- sparkRHive.init(sc)
sdf <- createDataFrame(hiveContext, ldf)
printSchema(sdf)
## root
## |-- y: double (nullable = true)
## |-- lag_quarterly_revenue: double (nullable = true)
## |-- price_index: double (nullable = true)
## |-- income_level: double (nullable = true)
## |-- yr: integer (nullable = true)
## |-- qr: integer (nullable = true)
initialize sqlContext and register the data frame as a temp table using Registertemptable
也对。为了能够使用 sql
命令,您必须注册一个 table.
registerTempTable(sdf, "sdf")
请记住,DataFrame 绑定到用于创建它的上下文。
head(tables(hiveContext))
## tableName isTemporary
## 1 sdf TRUE
head(tables(sqlContext))
## [1] tableName isTemporary
## <0 rows> (or 0-length row.names)
最后示例查询:
query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag,
LAG(y) OVER (ORDER BY yr, qr) AS new_lag
FROM sdf"
sql(hiveContext, query)
## yr qr y old_lag new_lag
## 1 1962 1 8.79236 8.79636 NA
## 2 1962 2 8.79137 8.79236 8.79236
## 3 1962 3 8.81486 8.79137 8.79137
## 4 1963 0 8.81301 8.81486 8.81486
## 5 1963 1 8.90751 8.81301 8.81301
## 6 1963 2 8.93673 8.90751 8.90751