使用 sparklyr 调用 collect_list 时基于另一个变量保留顺序
preserve order based on another variable when calling collect_list using sparklyr
这个问题本质上是 的重复,除了我在 R 中工作。pyspark 解决方案看起来很可靠,但我一直无法弄清楚如何应用 collect_list
window 在 sparklyr 中以相同的方式运行。
我有一个具有以下结构的 Spark DataFrame:
------------------------------
userid | date | city
------------------------------
1 | 2018-08-02 | A
1 | 2018-08-03 | B
1 | 2018-08-04 | C
2 | 2018-08-17 | G
2 | 2018-08-20 | E
2 | 2018-08-23 | F
我正在尝试按 userid
对 DataFrame 进行分组,按 date
对每个组进行排序,并将 city
列折叠成其值的串联。期望的输出:
------------------
userid | cities
------------------
1 | A, B, C
2 | G, E, F
问题在于,我尝试使用的每种方法都会导致某些用户(在 5000 名用户的测试中约占 3%)的 "cities" 列的顺序不正确。
尝试 1:使用 dplyr
和 collect_list
。
my_sdf %>%
dplyr::group_by(userid) %>%
dplyr::arrange(date) %>%
dplyr::summarise(cities = paste(collect_list(city), sep = ", ")))
尝试 2:使用 replyr::gapply
因为该操作符合 "Grouped-Order-Apply" 的描述。
get_cities <- . %>%
summarise(cities = paste(collect_list(city), sep = ", "))
my_sdf %>%
replyr::gapply(gcolumn = "userid",
f = get_cities,
ocolumn = "date",
partitionMethod = "group_by")
尝试 3:写成 SQL window 函数。
my_sdf %>%
spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, CONCAT_WS(', ', collect_list(city)) AS cities
OVER (PARTITION BY userid
ORDER BY date)
FROM my_sdf") %>%
sparklyr::sdf_register() %>%
sparklyr::sdf_copy_to(sc, ., "my_sdf", overwrite = T)
^ 抛出以下错误:
Error: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'OVER' expecting <EOF>(line 2, pos 19)
== SQL ==
SELECT userid, conversion_location, CONCAT_WS(' > ', collect_list(channel)) AS path
OVER (PARTITION BY userid, conversion_location
-------------------^^^
ORDER BY occurred_at)
FROM paths_model
好的:所以我承认以下解决方案根本没有效率(它使用 for 循环,实际上是很多代码,看起来可能是一项简单的任务),但我相信这应该有效:
#install.packages("tidyverse") # if needed
library(tidyverse)
df <- tribble(
~userid, ~date, ~city,
1 , "2018-08-02" , "A",
1 , "2018-08-03" , "B",
1 , "2018-08-04" , "C",
2 , "2018-08-17" , "G",
2 , "2018-08-20" , "E",
2 , "2018-08-23" , "F"
)
cityPerId <- df %>%
spread(key = date, value = city)
toMutate <- NA
for (i in 1:nrow(cityPerId)) {
cities <- cityPerId[i,][2:ncol(cityPerId)] %>% t() %>%
as.vector() %>%
na.omit()
collapsedCities <- paste(cities, collapse = ",")
toMutate <- c(toMutate, collapsedCities)
}
toMutate <- toMutate[2:length(toMutate)]
final <- cityPerId %>%
mutate(cities = toMutate) %>%
select(userid, cities)
已解决!我误解了 collect_list() 和 Spark SQL 如何协同工作。我没有意识到可以返回一个列表,我认为连接必须在查询中进行。以下生成所需的结果:
spark_output <- spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, collect_list(city)
OVER (PARTITION BY userid
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
AS cities
FROM my_sdf") %>%
sdf_register() %>%
group_by(userid) %>%
filter(row_number(userid) == 1) %>%
ungroup() %>%
mutate(cities = paste(cities, sep = " > ")) %>%
sdf_register()
这个问题本质上是 collect_list
window 在 sparklyr 中以相同的方式运行。
我有一个具有以下结构的 Spark DataFrame:
------------------------------
userid | date | city
------------------------------
1 | 2018-08-02 | A
1 | 2018-08-03 | B
1 | 2018-08-04 | C
2 | 2018-08-17 | G
2 | 2018-08-20 | E
2 | 2018-08-23 | F
我正在尝试按 userid
对 DataFrame 进行分组,按 date
对每个组进行排序,并将 city
列折叠成其值的串联。期望的输出:
------------------
userid | cities
------------------
1 | A, B, C
2 | G, E, F
问题在于,我尝试使用的每种方法都会导致某些用户(在 5000 名用户的测试中约占 3%)的 "cities" 列的顺序不正确。
尝试 1:使用 dplyr
和 collect_list
。
my_sdf %>%
dplyr::group_by(userid) %>%
dplyr::arrange(date) %>%
dplyr::summarise(cities = paste(collect_list(city), sep = ", ")))
尝试 2:使用 replyr::gapply
因为该操作符合 "Grouped-Order-Apply" 的描述。
get_cities <- . %>%
summarise(cities = paste(collect_list(city), sep = ", "))
my_sdf %>%
replyr::gapply(gcolumn = "userid",
f = get_cities,
ocolumn = "date",
partitionMethod = "group_by")
尝试 3:写成 SQL window 函数。
my_sdf %>%
spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, CONCAT_WS(', ', collect_list(city)) AS cities
OVER (PARTITION BY userid
ORDER BY date)
FROM my_sdf") %>%
sparklyr::sdf_register() %>%
sparklyr::sdf_copy_to(sc, ., "my_sdf", overwrite = T)
^ 抛出以下错误:
Error: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'OVER' expecting <EOF>(line 2, pos 19)
== SQL ==
SELECT userid, conversion_location, CONCAT_WS(' > ', collect_list(channel)) AS path
OVER (PARTITION BY userid, conversion_location
-------------------^^^
ORDER BY occurred_at)
FROM paths_model
好的:所以我承认以下解决方案根本没有效率(它使用 for 循环,实际上是很多代码,看起来可能是一项简单的任务),但我相信这应该有效:
#install.packages("tidyverse") # if needed
library(tidyverse)
df <- tribble(
~userid, ~date, ~city,
1 , "2018-08-02" , "A",
1 , "2018-08-03" , "B",
1 , "2018-08-04" , "C",
2 , "2018-08-17" , "G",
2 , "2018-08-20" , "E",
2 , "2018-08-23" , "F"
)
cityPerId <- df %>%
spread(key = date, value = city)
toMutate <- NA
for (i in 1:nrow(cityPerId)) {
cities <- cityPerId[i,][2:ncol(cityPerId)] %>% t() %>%
as.vector() %>%
na.omit()
collapsedCities <- paste(cities, collapse = ",")
toMutate <- c(toMutate, collapsedCities)
}
toMutate <- toMutate[2:length(toMutate)]
final <- cityPerId %>%
mutate(cities = toMutate) %>%
select(userid, cities)
已解决!我误解了 collect_list() 和 Spark SQL 如何协同工作。我没有意识到可以返回一个列表,我认为连接必须在查询中进行。以下生成所需的结果:
spark_output <- spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, collect_list(city)
OVER (PARTITION BY userid
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
AS cities
FROM my_sdf") %>%
sdf_register() %>%
group_by(userid) %>%
filter(row_number(userid) == 1) %>%
ungroup() %>%
mutate(cities = paste(cities, sep = " > ")) %>%
sdf_register()