在 sparklyr 中使用 spark_apply 将加权随机法向量添加到多个 DF 列

Using spark_apply in sparklyr to add weighted random normal vectors to multiple DF columns

我是 sparklyr 的新手,我正在尝试将由另一个向量加权的随机法线向量添加到 spark df 的大量列中。这是我用 mtcars 试过的一个例子。

library(sparklyr)
library(dplyr)
sc1 <- spark_connect(master = "local")

mtc_df = select(mtcars, vs:carb)
mtc_sdf = sdf_copy_to(sc1, mtc_df, name = "mtc_sdf", overwrite = TRUE)

tf_df <- function(df1){
    df1 %>%
        mutate_at(vars(am:carb), funs(. + vs * rnorm(32, 100, 1)))
}

tf_df(mtc_df) # works 

mtc_sdf %>%
    spark_apply(function(d) tf_df(d), memory = TRUE) # doesn't work

我收到以下错误:

Error in file(con, "r") : cannot open the connection
In addition: Warning message:
In file(con, "r") :
  cannot open file 'C:\....\filea54a7656c3_spark.log': Permission denied

我也尝试修改 https://spark.rstudio.com/ 上的示例,但出现了同样的错误。

mtc_sdf %>%
    spark_apply(function(data) {
        data[2:4] + data[1]*rnorm(32*3,100,1)
    })

如有任何帮助,我们将不胜感激。

I'm trying to add random normal vectors weighted by another vector to a large number of columns of a spark df

我建议跳过 spark_apply 并使用 Spark 自己的 randn (which gives ~N(0, 1)):

mtc_sdf %>% mutate_at(vars(am:carb), funs(. + vs * (randn() * 1 + 100)))
# Source:   lazy query [?? x 4]
# Database: spark_connection
      vs        am     gear      carb
   <dbl>     <dbl>    <dbl>     <dbl>
 1     0   1.00000   4.0000   4.00000
 2     0   1.00000   4.0000   4.00000
 3     1 101.36894 103.1954  98.80757
 4     1 100.79066 102.6765 100.91702
 5     0   0.00000   3.0000   2.00000
 6     1 100.07964 103.1568 100.54303
 7     0   0.00000   3.0000   4.00000
 8     1 101.90050 103.0402 101.46825
 9     1  99.63565 103.7781 101.65752
10     1  99.72587 102.3854 105.09205

关于您的代码:

  • 您遇到的问题看起来像是权限问题。请确保 Spark 用户拥有所有必需的权限并且 winutilis 被正确使用。
  • spark_apply一起使用的函数:

    transforms a data frame partition into a data frame.

    因此您不能对行数进行硬编码。您应该使用 rnorm(nrow(df1), 100, 1)).

  • 之类的东西
  • sparklyr 似乎没有正确序列化名称引用的函数,因此您可能必须内联函数或将其包装在包中:

    mtc_sdf %>% 
      spark_apply(function(df) dplyr::mutate_at(
        df, dplyr::vars(am:carb), dplyr::funs(. + vs * rnorm(nrow(df), 100, 1))))
    
    # Source:   table<sparklyr_tmp_34ce7faa2d33> [?? x 4]
    # Database: spark_connection
          vs        am     gear      carb
       <dbl>     <dbl>    <dbl>     <dbl>
     1     0   1.00000   4.0000   4.00000
     2     0   1.00000   4.0000   4.00000
     3     1 100.59678 101.9111 100.99830
     4     1  98.87146 104.8058  99.20102
     5     0   0.00000   3.0000   2.00000
     6     1  99.38243 102.8664 100.37921
     7     0   0.00000   3.0000   4.00000
     8     1  98.99019 103.4996 101.69110
     9     1  99.33687 102.3849 103.38833
    10     1 100.02103 104.9381 102.07139
    # ... with more rows
    

    另请注意,驱动程序中的软件包不会自动附加,因此您必须手动执行或使用完全限定名称引用库函数。