Sparklyr:如何计算 2 个 Spark 表之间的相关系数?
Sparklyr: how to calculate correlation coefficient between 2 Spark tables?
我有这 2 个 Spark tables:
simx
x0: num 1.00 2.00 3.00 ...
x1: num 2.00 3.00 4.00 ...
...
x788: num 2.00 3.00 4.00 ...
和
simy
y0: num 1.00 2.00 3.00 ...
在两个 table 中,每列具有相同数量的值。 table x
和 y
分别保存到句柄 simX_tbl
和 simY_tbl
中。实际数据量很大,可能达到40GB。
我想用 simy
计算 simx
中每一列的相关系数(比方说 cor(x0, y0, 'pearson')
)。
我到处搜索,我认为没有任何现成的 cor
函数,所以我正在考虑使用相关公式本身(就像 mentioned in here)。
基于 中的一个很好的解释,我认为使用 mutate_all
或 mutate_each
不是很有效并且为更大的数据大小提供 C stack error
,所以我考虑使用 invoke
而不是直接从 Spark
调用函数。
到目前为止我设法到达这里:
exprs <- as.list(paste0("sum(", colnames(simX_tbl),")"))
corr_result <- simX_tbl%>%
spark_dataframe() %>%
invoke("selectExpr", exprs) %>%
invoke("toDF", as.list(colnames(simX_tbl))) %>%
sdf_register("corr_result")
计算simx
中每一列的sum
。但是后来,我意识到我还需要计算 simy
table 并且我不知道如何将两个 table 交互在一起(比如,访问 simy
而操纵 simx
).
有没有更好的方法来计算相关性?或者可能只是如何与其他 Spark table.
互动
我的 Spark 版本是 1.6.0
编辑:
我尝试使用来自 dplyr
:
的 combine
函数
xy_df <- simX_tbl %>%
as.data.frame %>%
combine(as.data.frame(simY_tbl)) %>%
# convert both table to dataframe, then combine.
# It will become list, so need to convert to dataframe again
as.data.frame
xydata <- copy_to(sc, xy_df, "xydata") #copy the dataframe into Spark table
但我不确定这是否是一个好的解决方案,因为:
- 需要加载到 R 内部的数据框中,我认为这对于大数据来说不实用
当尝试 head
句柄 xydata
时,列名变成所有值的连接
xydata %>% head
Source: query [6 x 790]
Database: spark connection master=yarn-client app=sparklyr local=FALSE
c_1_67027262134984_2_44919662134984_1_85728542134984_1_49317262134984_
1 1.670273
2 2.449197
3 1.857285
4 1.493173
5 1.576857
6 -5.672155
就我个人而言,我会通过回到 来解决它。仅供记录,输入数据已使用 CSV reader:
加载
df <- spark_read_csv(
sc, path = path, name = "simData", delimiter = " ",
header = "false", infer_schema = "false"
) %>% rename(y = `_c0`, xs = `_c1`)
看起来或多或少像这样:
y xs
<chr> <chr>
1 21.66 2.643227,1.2698358,2.6338573,1.8812188,3.8708665
2 35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823
3 15.22 2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842
现在我们不要将数据拆分成多个 tables,而是一起处理这两个部分:
exprs <- lapply(
0:(n - 1),
function(i) paste("CAST(xs[", i, "] AS double) AS x", i, sep=""))
df %>%
# Convert to native Spark
spark_dataframe() %>%
# Split and select xs, but retain y
invoke("selectExpr", list("y", "split(xs, ',') AS xs")) %>%
invoke("selectExpr", c("CAST(y AS DOUBLE)", exprs)) %>%
# Register table so we can access it from dplyr
invoke("registerTempTable", "exploded_df")
并应用 summarize_each
:
tbl(sc, "exploded_df") %>% summarize_each(funs(corr(., y)), starts_with("x"))
Source: query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591
快速完整性检查(y
和 x0
、y
和 x4
之间的相关性):
cor(c(21.66, 35.15, 15.22), c(2.643227, 3.422151, 2.8302398))
[1] 0.8503358
cor(c(21.66, 35.15, 15.22), c(3.8708665, 4.0771823, 4.6600842))
[1] -0.5571591
当然可以先将数据居中:
exploded <- tbl(sc, "exploded_df")
avgs <- summarize_all(exploded, funs(mean)) %>% as.data.frame()
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))
transmute_(exploded, .dots = setNames(center_exprs, colnames(exploded))) %>%
summarize_each(funs(corr(., y)), starts_with("x"))
但是it doesn't affect the result:
Source: query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591
如果 transmute_
和 summarize_each
都引起了一些问题,我们可以将居中和相关性直接推入 Spark:
#Centering
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))
exploded %>%
spark_dataframe() %>%
invoke("selectExpr", center_exprs) %>%
invoke("toDF", as.list(colnames(exploded))) %>%
invoke("registerTempTable", "centered")
centered <- tbl(sc, "centered")
#Correlation
corr_exprs <- lapply(
0:(n - 1),
function(i) paste("corr(y, x", i, ") AS x", i, sep=""))
centered %>%
spark_dataframe() %>%
invoke("selectExpr", corr_exprs) %>%
invoke("registerTempTable", "corrs")
tbl(sc, "corrs")
Source: query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591
中级table当然不是必须的,可以在我们从数组中提取数据的同时应用。
我有这 2 个 Spark tables:
simx
x0: num 1.00 2.00 3.00 ...
x1: num 2.00 3.00 4.00 ...
...
x788: num 2.00 3.00 4.00 ...
和
simy
y0: num 1.00 2.00 3.00 ...
在两个 table 中,每列具有相同数量的值。 table x
和 y
分别保存到句柄 simX_tbl
和 simY_tbl
中。实际数据量很大,可能达到40GB。
我想用 simy
计算 simx
中每一列的相关系数(比方说 cor(x0, y0, 'pearson')
)。
我到处搜索,我认为没有任何现成的 cor
函数,所以我正在考虑使用相关公式本身(就像 mentioned in here)。
基于 mutate_all
或 mutate_each
不是很有效并且为更大的数据大小提供 C stack error
,所以我考虑使用 invoke
而不是直接从 Spark
调用函数。
到目前为止我设法到达这里:
exprs <- as.list(paste0("sum(", colnames(simX_tbl),")"))
corr_result <- simX_tbl%>%
spark_dataframe() %>%
invoke("selectExpr", exprs) %>%
invoke("toDF", as.list(colnames(simX_tbl))) %>%
sdf_register("corr_result")
计算simx
中每一列的sum
。但是后来,我意识到我还需要计算 simy
table 并且我不知道如何将两个 table 交互在一起(比如,访问 simy
而操纵 simx
).
有没有更好的方法来计算相关性?或者可能只是如何与其他 Spark table.
互动我的 Spark 版本是 1.6.0
编辑:
我尝试使用来自 dplyr
:
combine
函数
xy_df <- simX_tbl %>%
as.data.frame %>%
combine(as.data.frame(simY_tbl)) %>%
# convert both table to dataframe, then combine.
# It will become list, so need to convert to dataframe again
as.data.frame
xydata <- copy_to(sc, xy_df, "xydata") #copy the dataframe into Spark table
但我不确定这是否是一个好的解决方案,因为:
- 需要加载到 R 内部的数据框中,我认为这对于大数据来说不实用
当尝试
head
句柄xydata
时,列名变成所有值的连接xydata %>% head Source: query [6 x 790] Database: spark connection master=yarn-client app=sparklyr local=FALSE
c_1_67027262134984_2_44919662134984_1_85728542134984_1_49317262134984_
1 1.670273
2 2.449197
3 1.857285
4 1.493173
5 1.576857
6 -5.672155
就我个人而言,我会通过回到
df <- spark_read_csv(
sc, path = path, name = "simData", delimiter = " ",
header = "false", infer_schema = "false"
) %>% rename(y = `_c0`, xs = `_c1`)
看起来或多或少像这样:
y xs
<chr> <chr>
1 21.66 2.643227,1.2698358,2.6338573,1.8812188,3.8708665
2 35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823
3 15.22 2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842
现在我们不要将数据拆分成多个 tables,而是一起处理这两个部分:
exprs <- lapply(
0:(n - 1),
function(i) paste("CAST(xs[", i, "] AS double) AS x", i, sep=""))
df %>%
# Convert to native Spark
spark_dataframe() %>%
# Split and select xs, but retain y
invoke("selectExpr", list("y", "split(xs, ',') AS xs")) %>%
invoke("selectExpr", c("CAST(y AS DOUBLE)", exprs)) %>%
# Register table so we can access it from dplyr
invoke("registerTempTable", "exploded_df")
并应用 summarize_each
:
tbl(sc, "exploded_df") %>% summarize_each(funs(corr(., y)), starts_with("x"))
Source: query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591
快速完整性检查(y
和 x0
、y
和 x4
之间的相关性):
cor(c(21.66, 35.15, 15.22), c(2.643227, 3.422151, 2.8302398))
[1] 0.8503358
cor(c(21.66, 35.15, 15.22), c(3.8708665, 4.0771823, 4.6600842))
[1] -0.5571591
当然可以先将数据居中:
exploded <- tbl(sc, "exploded_df")
avgs <- summarize_all(exploded, funs(mean)) %>% as.data.frame()
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))
transmute_(exploded, .dots = setNames(center_exprs, colnames(exploded))) %>%
summarize_each(funs(corr(., y)), starts_with("x"))
但是it doesn't affect the result:
Source: query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591
如果 transmute_
和 summarize_each
都引起了一些问题,我们可以将居中和相关性直接推入 Spark:
#Centering
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))
exploded %>%
spark_dataframe() %>%
invoke("selectExpr", center_exprs) %>%
invoke("toDF", as.list(colnames(exploded))) %>%
invoke("registerTempTable", "centered")
centered <- tbl(sc, "centered")
#Correlation
corr_exprs <- lapply(
0:(n - 1),
function(i) paste("corr(y, x", i, ") AS x", i, sep=""))
centered %>%
spark_dataframe() %>%
invoke("selectExpr", corr_exprs) %>%
invoke("registerTempTable", "corrs")
tbl(sc, "corrs")
Source: query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591
中级table当然不是必须的,可以在我们从数组中提取数据的同时应用。