mclapply 和 spark_read_parquet
mclapply and spark_read_parquet
作为论坛的活跃用户,我相对较新,但首先要感谢大家的贡献,因为多年来我一直在寻找答案...
今天,我有一个问题没有人解决或者我找不到...
作为测试系统的一部分,我正在尝试从 s3 (AWS) 并行读取文件到 spark(本地计算机)。我用过 mclapply,但是当设置超过 1 个核心时,它会失败...
示例:(相同的代码在使用一个内核时有效,但在使用 2 个内核时失败)
new_rdd_global <- mclapply(seq(file_paths), function(i){spark_read_parquet(sc, name=paste0("rdd_",i), path=file_paths[i])}, mc.cores = 1)
new_rdd_global <- mclapply(seq(file_paths), function(i){spark_read_parquet(sc, name=paste0("rdd_",i), path=file_paths[i])}, mc.cores = 2)
Warning message:
In mclapply(seq(file_paths), function(i) { :
all scheduled cores encountered errors in user code
有什么建议吗???
提前致谢。
只需通过 1 spark_read_parquet()
调用将所有内容读入一个 table,这样 Spark 就会为您处理并行化。如果您需要单独的 tables,您可以在之后拆分它们,假设有一列告诉您数据来自哪个文件。通常,在将 Spark 与 R.
结合使用时,您不需要使用 mcapply()
作为论坛的活跃用户,我相对较新,但首先要感谢大家的贡献,因为多年来我一直在寻找答案...
今天,我有一个问题没有人解决或者我找不到...
作为测试系统的一部分,我正在尝试从 s3 (AWS) 并行读取文件到 spark(本地计算机)。我用过 mclapply,但是当设置超过 1 个核心时,它会失败...
示例:(相同的代码在使用一个内核时有效,但在使用 2 个内核时失败)
new_rdd_global <- mclapply(seq(file_paths), function(i){spark_read_parquet(sc, name=paste0("rdd_",i), path=file_paths[i])}, mc.cores = 1)
new_rdd_global <- mclapply(seq(file_paths), function(i){spark_read_parquet(sc, name=paste0("rdd_",i), path=file_paths[i])}, mc.cores = 2) Warning message: In mclapply(seq(file_paths), function(i) { : all scheduled cores encountered errors in user code
有什么建议吗???
提前致谢。
只需通过 1 spark_read_parquet()
调用将所有内容读入一个 table,这样 Spark 就会为您处理并行化。如果您需要单独的 tables,您可以在之后拆分它们,假设有一列告诉您数据来自哪个文件。通常,在将 Spark 与 R.
mcapply()