Sparklyr:如何根据列使 Spark table 居中?

Sparklyr: how to center a Spark table based on column?

我有一个 Spark table:

simx
x0: num 1.00 2.00 3.00 ...
x1: num 2.00 3.00 4.00 ...
...
x788: num 2.00 3.00 4.00 ...

和 R 环境中一个名为 simX_tbl 的句柄连接到此 simx table。

我想对此 table 进行居中处理,即用列均值减去每一列。比如计算x0 - mean(x0),等等。

到目前为止,我的最大努力是:

meanX <- simX_tbl %>% summarise_all(funs("mean")) %>% collect()

x_centered <-  simX_tbl

for(i in 1:789) {
  colName <- paste0("x", i-1)
  colName2 <- lazyeval::interp(~ a - b, a = as.name(colName), b = as.double(meanX[i]))
  x_centered <- x_centered %>% mutate_(.dots = setNames( list(colName2) , colName) )
}

当我限制 for 循环几次迭代 (1:5) 时,这实际上有效 x_centered %>% head 结果是正确的。但是当我这样做 789 次迭代时,当我尝试 head it:

时会出现此错误
Error: C stack usage  7969412 is too close to the limit

以下是我已经尝试过的显示 C 堆栈使用错误的输出方法:

x_centered %>% head #show first 6 rows

x_centered %>% select_("x0") #select first column only

x_centered %>% sdf_register("x_centered") #register as table

x_centered %>% spark_dataframe() %>% tbl(sc, "x_centered") #also register as table

spark_write_csv(x_centered, path = "hdfs/path/here") #write as csv

稍后我需要为每一列计算相关系数,但我认为我不能输出这个错误。

有什么方法可以正确/有效地居中吗?我阅读了 this question 关于提高 Cstack 限制的内容,但我不认为这是一个解决方案,因为数据非常大,并且存在再次超出限制的风险。实际数据是40GB+,我目前使用的数据只是一个小样本(789列x 10000行)。

Spark 版本为 1.6.0

编辑:使标题更清晰,添加尝试过的输出方法

您只需使用 mutate_each / muate_all

library(dplyr)

df <- data.frame(x=c(1, 2, 3), y = c(-4, 5, 6), z = c(42, 42, 42))
sdf <- copy_to(sc, df, overwrite=TRUE)

mutate_all(sdf, funs(. - mean(.)))

Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

      x         y     z
  <dbl>     <dbl> <dbl>
1    -1 -6.333333     0
2     0  2.666667     0
3     1  3.666667     0

但看起来它被扩展为非常低效(对于大型数据集不可接受)window 函数应用程序。使用更详细的解决方案可能会更好:

avgs <- summarize_all(sdf, funs(mean)) %>% as.data.frame()

exprs <- as.list(paste(colnames(sdf),"-", avgs))

sdf %>%  
  spark_dataframe() %>% 
  invoke("selectExpr", exprs) %>% 
  invoke("toDF", as.list(colnames(sdf))) %>% 
  invoke("registerTempTable", "centered")

tbl(sc, "centered")
Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

      x         y     z
  <dbl>     <dbl> <dbl>
1    -1 -6.333333     0
2     0  2.666667     0
3     1  3.666667     0

它不像 dplyr 方法那么漂亮,但与前者不同的是它做了一件明智的事情。

如果你想跳过所有 invokes 你可以使用 dplyr 来做同样的事情:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf)))
Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

      x         y     z
  <dbl>     <dbl> <dbl>
1    -1 -6.333333     0
2     0  2.666667     0
3     1  3.666667     0

执行计划:

辅助函数(另见 dbplyr::remote_query 物理计划):

optimizedPlan <- function(df) {
  df %>% 
    spark_dataframe() %>%
    invoke("queryExecution") %>%
    invoke("optimizedPlan")
}

dplyr版本:

mutate_all(sdf, funs(. - mean(.))) %>% optimizedPlan()
<jobj[190]>
  class org.apache.spark.sql.catalyst.plans.logical.Project
  Project [x#2877, y#2878, (z#1123 - _we0#2894) AS z#2879]
+- Window [avg(z#1123) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2894]
   +- Project [x#2877, (y#1122 - _we0#2892) AS y#2878, z#1123]
      +- Window [avg(y#1122) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2892]
         +- Project [(x#1121 - _we0#2890) AS x#2877, z#1123, y#1122]
            +- Window [avg(x#1121) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2890]
               +- Project [y#1122, z#1123, x#1121]
                  +- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
                     :  +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

Spark 解决方案:

tbl(sc, "centered") %>% optimizedPlan()
<jobj[204]>
  class org.apache.spark.sql.catalyst.plans.logical.Project
  Project [(x#1121 - 2.0) AS x#2339, (y#1122 - 2.33333333333333) AS y#2340, (z#1123 - 42.0) AS z#2341]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
   :  +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

dplyr优化:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf))) %>% optimizedPlan()
<jobj[272]>
  class org.apache.spark.sql.catalyst.plans.logical.Project
  Project [(x#1121 - 2.0) AS x#4792, (y#1122 - 2.33333333333333) AS y#4793, (z#1123 - 42.0) AS z#4794]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
   :  +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

备注:

Spark SQL 在处理宽数据集方面不是很好。使用核心 Spark,您通常将功能组合到一个 Vector Column 中,Spark 提供了许多可用于对 Vector 数据进行操作的转换器。