spark.lapply 并访问 SparkDataFrame [SparkR]

spark.lapply and access to SparkDataFrame [SparkR]

我在 spark.lapply 函数期间访问 SparkDataFrame 时遇到问题。代码如下:

df <- data.frame(x = c(1,2), y = c("a", "b"))

Sys.setenv(SPARK_HOME = "path/spark-2.0.0-bin-hadoop2.7/")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

sparkR.session(master = "spark://host:7077", 
               appName = "SparkR", 
               sparkHome = Sys.getenv("SPARK_HOME"), 
               sparkConfig = list(spark.driver.memory = "2g"), 
               enableHiveSupport = TRUE)

spark_df <- as.DataFrame(df)
fun_to_distribute <- function(i){
    data <- take(spark_df, 1)$x
    return(data + i)
}

spark.lapply(1:2, fun_to_distribute)

sparkR.session.stop()

不幸的是,我总是收到错误消息:

[Stage 1:>                                                          (0 + 2) / 2]17/04/28 01:57:56 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 173.38.82.173): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/04/28 01:57:56 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 173.38.82.175): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/04/28 01:57:56 WARN TaskSetManager: Lost task 1.3 in stage 1.0 (TID 6, 173.38.82.175): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/04/28 01:57:56 ERROR TaskSetManager: Task 1 in stage 1.0 failed 4 times; aborting job
17/04/28 01:57:56 ERROR RBackendHandler: collect on 19 failed
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 6, 173.38.82.175): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.s

当然,我可以提供像list这样的复杂参数的函数,但我宁愿把数据上传到spark集群一次,让每个执行者在运行时都能访问它。

Apache Spark 不允许嵌套分布式上下文,您不能从任务中访问分布式数据结构。

此外,SparkDataFrame 不适用于需要单项访问的情况,这似乎是此处所需的行为。如果你想传递参数,你应该直接使用标准 R 对象。