使用 s3 数据的 SparkR 逻辑回归

Logistic Regression with SparkR using s3 data

您好,我正在尝试使用 SparkR 复制更多示例,但是当我尝试使用 s3 中的数据时,出现了一些错误。这是我在 emr 集群中的 rstudio 中 运行 的代码:

rm(list=c(ls()))
library(SparkR)
# Initialize Spark context
sc <- sparkR.init(master="yarn-client",
    sparkEnvir=list(spark.executor.memory="5g"),'logistic')

D <- 8

readPartition <- function(part){
  part = as.vector(part, mode = "character")
  part = strsplit(part, "\t", fixed = T)
  list(matrix(as.numeric(unlist(part)), ncol = 9))
}

# Read data points and convert each partition to a matrix
points <- cache(lapplyPartition(textFile(sc, 's3://mybucket/Data.txt'), readPartition))


# Initialize w to a random value
w <- runif(n=D, min = -1, max = 1)
cat("Initial w: ", w, "\n")

# Compute logistic regression gradient for a matrix of data points
gradient <- function(partition) {
  partition = partition[[1]]
  Y <- partition[, 1]  # point labels (first column of input file)
  X <- partition[, -1] # point coordinates

  # For each point (x, y), compute gradient function
  dot <- X %*% w
  logit <- 1 / (1 + exp(-Y * dot))
  grad <- t(X) %*% ((logit - 1) * Y)
  list(grad)
}

for (i in 1:iterations) {
  cat("On iteration ", i, "\n")
  w <- w - reduce(lapplyPartition(points, gradient), "+")
}

cat("Final w: ", w, "\n")

读取数据时出错"points":

 points <- cache(lapplyPartition(textFile(sc, 's3://mybucket/Data.txt'),   
 readPartition))

collect on 65 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:186)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
    at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
    ... 25 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 47 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
    at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
    ... 52 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1811)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
    ... 54 more
Error: returnStatus == 0 is not TRUE
22 Apr 2015 18:29:28 [rsession-rstudio] ERROR r error 4 (R code execution error) [errormsg=Error: returnStatus == 0 is not TRUE
]; OCCURRED AT: core::Error r::exec::<unnamed>::evaluateExpressionsUnsafe(SEXPREC*, SEXPREC*, SEXPREC**, r::sexp::Protect*) /root/rstudio/src/cpp/r/RExec.cpp:145; LOGGED FROM: core::json::Value session::modules::environment::varToJson(SEXPREC*, const r::sexp::Variable&) /root/rstudio/src/cpp/session/modules/environment/EnvironmentUtils.cpp:134
> points <- cache(lapplyPartition(textFile(sc, 's3://datascience.hadoop.spark.r/data/ModelData.txt'), readPartition))
collect on 75 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:186)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
    at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
    ... 25 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 52 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
    at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
    ... 57 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1811)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
    ... 59 more
Error: returnStatus == 0 is not TRUE
22 Apr 2015 18:30:53 [rsession-rstudio] ERROR r error 4 (R code execution error) [errormsg=Error: returnStatus == 0 is not TRUE
]; OCCURRED AT: core::Error r::exec::<unnamed>::evaluateExpressionsUnsafe(SEXPREC*, SEXPREC*, SEXPREC**, r::sexp::Protect*) /root/rstudio/src/cpp/r/RExec.cpp:145; LOGGED FROM: core::json::Value session::modules::environment::varToJson(SEXPREC*, const r::sexp::Variable&) /root/rstudio/src/cpp/session/modules/environment/EnvironmentUtils.cpp:134
> file=textFile(sc, 's3://datascience.hadoop.spark.r/data/ModelData.txt',9)
collect on 80 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
    at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:186)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
    at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
    ... 25 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 47 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
    at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
    ... 52 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1811)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
    ... 54 more
Error: returnStatus == 0 is not TRUE
22 Apr 2015 18:39:07 [rsession-rstudio] ERROR r error 4 (R code execution error) [errormsg=Error: returnStatus == 0 is not TRUE
]; OCCURRED AT: core::Error r::exec::<unnamed>::evaluateExpressionsUnsafe(SEXPREC*, SEXPREC*, SEXPREC**, r::sexp::Protect*) /root/rstudio/src/cpp/r/RExec.cpp:145; LOGGED FROM: core::json::Value session::modules::environment::varToJson(SEXPREC*, const r::sexp::Variable&) /root/rstudio/src/cpp/session/modules/environment/EnvironmentUtils.cpp:134

只是想知道是否有人可以提供帮助?

请检查 jar spark-assembly-x.x.x-hadoopX.X.jar 是否在您的 class 路径中。

错误状态:

Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.

因此您需要将 Hadoop LZO jar 添加到 Spark class 路径。如果这是 AWS EMR,请在 /home/hadoop/share/ 中查找 hadoop-*lzo*.jar