SparkR 中用户定义的聚合函数
User defined aggregate function in SparkR
我有这样的邮件记录:
Name MailingID Timestamp Event
1 John 1 2014-04-18 Sent
2 John 2 2015-04-21 Sent
3 Mary 1 2015-04-22 Returned
4 Mary 2 2015-04-25 Sent
5 John 1 2015-05-01 Replied
可以创建为DataFrame
:
df <- createDataFrame(sqlContext, data.frame(Name = c('John','John','Mary','Mary','John'),
MailingID = c(1,2,1,2,1),
Timestamp=c('2014-04-18','2015-04-21','2015-04-22','2015-04-25','2015-05-01'),
Event=c('Sent','Sent','Returned','Sent','Replied')))
我想知道谁回复了发送到 him/her 的 2 封最新邮件中的任何一封,因此使用摘要辅助函数和 dplyr
我可以做到:
localDf <- collect(df)
library(lubridate)
library(magrittr)
library(dplyr)
hasRepliedLatest <- function(MailingID, Timestamp, Event, Latest_N) {
length(intersect(MailingID[Event == 'Replied'], MailingID[Event == 'Sent'][1:Latest_N])) > 0
}
localDf %>%
arrange(desc(Timestamp)) %>%
group_by(Name) %>%
summarize(RepliedLatest = hasRepliedLatest(MailingID, Timestamp, Event, 2))
detach(package:dplyr) # to avoid function confliction with SparkR
结果是:
Name RepliedLatest
1 John TRUE
2 Mary FALSE
现在我想在 SparkR
上执行此操作,即在 DataFrame
上而不是在本地 data.frame
上执行此操作。所以我尝试了:
df %>%
arrange(desc(df$Timestamp)) %>%
group_by(df$Name) %>%
summarize(RepliedLatest = hasRepliedLatest(df$MailingID, df$Timestamp, df$Event, 2))
然后我收到错误消息,提示我的函数不适用于 S4 class DataFrame
。如何在 SparkR
中正确执行此操作?也欢迎使用由 sparkRHive.init
或 sparkRSQL.init
创建的 SQL 查询和 sqlContext
的解决方案。
SparkSQL <= 1.4 不支持用户定义的聚合函数,据我所知,SparkR 根本没有 UDF,所以除非您使用当前的开发分支或 1.5 RC UDF 不是一个选项。
我仍然不确定我是否理解你的数据模型和逻辑,但你可以尝试这样的事情:
# Select last 2 sent events and all other which occurred in this window
tmp <- sql(sqlContext,
"SELECT *, SUM(CASE WHEN event = 'Sent' THEN 1 ELSE 0 END) OVER w AS ind
FROM df WHERE Event IN ('Sent', 'Replied')
HAVING ind <= 2
WINDOW w AS (PARTITION BY name ORDER BY DATE(Timestamp) DESC)")
# Split sent and replied
sent <- tmp %>% filter(tmp$Event == "Sent")
replied <- tmp %>% filter(tmp$Event == "Replied")
registerTempTable(sent, "sent")
registerTempTable(replied, "replied")
# Join and count
sql(sqlContext,
"SELECT
sent.name,
SUM(
CASE WHEN replied.event IS NOT NULL THEN 1
ELSE 0 END
) > 0 AS repliedlatest
FROM sent LEFT JOIN replied ON
sent.name = replied.name AND
sent.mailingid = replied.mailingid
-- Not part of the original logic
WHERE DATE(sent.timestamp) <= DATE(replied.timestamp)
GROUP BY sent.name") %>% head()
我有这样的邮件记录:
Name MailingID Timestamp Event
1 John 1 2014-04-18 Sent
2 John 2 2015-04-21 Sent
3 Mary 1 2015-04-22 Returned
4 Mary 2 2015-04-25 Sent
5 John 1 2015-05-01 Replied
可以创建为DataFrame
:
df <- createDataFrame(sqlContext, data.frame(Name = c('John','John','Mary','Mary','John'),
MailingID = c(1,2,1,2,1),
Timestamp=c('2014-04-18','2015-04-21','2015-04-22','2015-04-25','2015-05-01'),
Event=c('Sent','Sent','Returned','Sent','Replied')))
我想知道谁回复了发送到 him/her 的 2 封最新邮件中的任何一封,因此使用摘要辅助函数和 dplyr
我可以做到:
localDf <- collect(df)
library(lubridate)
library(magrittr)
library(dplyr)
hasRepliedLatest <- function(MailingID, Timestamp, Event, Latest_N) {
length(intersect(MailingID[Event == 'Replied'], MailingID[Event == 'Sent'][1:Latest_N])) > 0
}
localDf %>%
arrange(desc(Timestamp)) %>%
group_by(Name) %>%
summarize(RepliedLatest = hasRepliedLatest(MailingID, Timestamp, Event, 2))
detach(package:dplyr) # to avoid function confliction with SparkR
结果是:
Name RepliedLatest
1 John TRUE
2 Mary FALSE
现在我想在 SparkR
上执行此操作,即在 DataFrame
上而不是在本地 data.frame
上执行此操作。所以我尝试了:
df %>%
arrange(desc(df$Timestamp)) %>%
group_by(df$Name) %>%
summarize(RepliedLatest = hasRepliedLatest(df$MailingID, df$Timestamp, df$Event, 2))
然后我收到错误消息,提示我的函数不适用于 S4 class DataFrame
。如何在 SparkR
中正确执行此操作?也欢迎使用由 sparkRHive.init
或 sparkRSQL.init
创建的 SQL 查询和 sqlContext
的解决方案。
SparkSQL <= 1.4 不支持用户定义的聚合函数,据我所知,SparkR 根本没有 UDF,所以除非您使用当前的开发分支或 1.5 RC UDF 不是一个选项。
我仍然不确定我是否理解你的数据模型和逻辑,但你可以尝试这样的事情:
# Select last 2 sent events and all other which occurred in this window
tmp <- sql(sqlContext,
"SELECT *, SUM(CASE WHEN event = 'Sent' THEN 1 ELSE 0 END) OVER w AS ind
FROM df WHERE Event IN ('Sent', 'Replied')
HAVING ind <= 2
WINDOW w AS (PARTITION BY name ORDER BY DATE(Timestamp) DESC)")
# Split sent and replied
sent <- tmp %>% filter(tmp$Event == "Sent")
replied <- tmp %>% filter(tmp$Event == "Replied")
registerTempTable(sent, "sent")
registerTempTable(replied, "replied")
# Join and count
sql(sqlContext,
"SELECT
sent.name,
SUM(
CASE WHEN replied.event IS NOT NULL THEN 1
ELSE 0 END
) > 0 AS repliedlatest
FROM sent LEFT JOIN replied ON
sent.name = replied.name AND
sent.mailingid = replied.mailingid
-- Not part of the original logic
WHERE DATE(sent.timestamp) <= DATE(replied.timestamp)
GROUP BY sent.name") %>% head()