为什么 Window 函数(滞后)在 SparkR 中不起作用?

Why are Window functions (Lag) not working in SparkR?

就此而言,我对 Spark 和 SparkR 还很陌生,可能有一些基本问题。

本练习的 objective 是在 SparkR 中实现 window 函数(超前、滞后、排名等)。

我参考了下面的 link 和那里提到的 Databricks post 但无济于事 -

我使用的代码片段:

初始化 sqlContext 并将数据框注册为临时 table 使用 注册温度table

output_data<-SparkR::sql(sqlContext, "select *,lag(type) over(partition by key order by key) as lag_type from input_data")

我们遇到的错误是:

failure: ``union'' expected but `(' found

我发现的另一个建议是使用 Hivecontext 而不是 SQLcontext,因为 SQLcontext 可能不允许所有功能。

在该方法中,初始化 Hivecontext 并尝试 运行 HiveQL 做同样的事情给了我们一个错误说:

cannot find table named input_table

问题: 我们是否需要 运行 一些类似于 registertemptable 的命令以允许 Hivecontext 访问 table?

saveastable 可能是一个选项,但根据我的阅读,它会在 S3 存储中收集数据,而不是在内存中群集。

如有任何帮助,我们将不胜感激! 谢谢!

让我们使用 freeny 数据集准备输入 data.frame

ldf <- freeny

# Extract year and quater
ldf$yr <- as.integer(rownames(ldf))
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr))

# Clean column names
colnames(ldf) <- gsub("\.", "_",  colnames(ldf))

# Drop a couple of things so output fits nicely in the code box
row.names(ldf) <- NULL
ldf$market_potential <- NULL

head(ldf)


##         y lag_quarterly_revenue price_index income_level yr   qr
## 1 8.79236               8.79636     4.70997      5.82110 1962      1
## 2 8.79137               8.79236     4.70217      5.82558 1962      2
## 3 8.81486               8.79137     4.68944      5.83112 1962      3
## 4 8.81301               8.81486     4.68558      5.84046 1963      0
## 5 8.90751               8.81301     4.64019      5.85036 1963      1
## 6 8.93673               8.90751     4.62553      5.86464 1963      2

Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.

这是正确的,只有 HiveContext 支持大多数高级功能,而 SQLContext 默认支持。首先,您必须确保您的 Spark 版本已使用 Hive 支持构建。 Spark downloads page 提供的二进制文件确实如此,但如果您从源代码构建,请务必使用 -Phive 标志。

hiveContext <- sparkRHive.init(sc)
sdf <- createDataFrame(hiveContext, ldf)
printSchema(sdf)

## root
##  |-- y: double (nullable = true)
##  |-- lag_quarterly_revenue: double (nullable = true)
##  |-- price_index: double (nullable = true)
##  |-- income_level: double (nullable = true)
##  |-- yr: integer (nullable = true)
##  |-- qr: integer (nullable = true)

initialize sqlContext and register the data frame as a temp table using Registertemptable

也对。为了能够使用 sql 命令,您必须注册一个 table.

registerTempTable(sdf, "sdf")

请记住,DataFrame 绑定到用于创建它的上下文。

head(tables(hiveContext))

##  tableName isTemporary
## 1       sdf        TRUE

head(tables(sqlContext))

## [1] tableName   isTemporary
## <0 rows> (or 0-length row.names)

最后示例查询:

query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag,
          LAG(y) OVER (ORDER BY yr, qr) AS new_lag 
          FROM sdf"

sql(hiveContext, query)

##     yr qr       y old_lag new_lag
## 1 1962  1 8.79236 8.79636      NA
## 2 1962  2 8.79137 8.79236 8.79236
## 3 1962  3 8.81486 8.79137 8.79137
## 4 1963  0 8.81301 8.81486 8.81486
## 5 1963  1 8.90751 8.81301 8.81301
## 6 1963  2 8.93673 8.90751 8.90751