与 S3 存储桶的 Sparklyr 连接抛出错误

Sparklyr connection to S3 bucket throwing up error

我正在尝试从 R sparklyr 连接到 S3 存储桶。 我能够将本地文件读入 spark 上下文。 但是尝试连接 s3 似乎是个问题, 抛出一大堆错误。 这是使用的代码列表。

注意:单个 s3 存储桶有多个 csv 文件 遵循相同的模式。

library( sparklyr )
library( tidyverse )

sparklyr :: spark_install ( version = "2.0.2" , hadoop_version = "2.7" ) 
sparklyr::spark_install( version = "2.0.2" , hadoop_version = "2.7" ) 
Sys.setenv ( AWS_ACCESS_KEY_ID = "xxxx" )
Sys.setenv ( AWS_SECRET_ACCESS_KEY = "xxxx" )
Sys.setenv ( AWS_DEFAULT_REGION = "ap-southeast-1" )

Spark_config <- sparklyr :: spark_config ()
sc <- sparklyr :: spark_connect ( master = "local" ,config = Spark_config)
files = "s3n://temp-sg/MVC"
temp<-spark_read_csv(sc,name = "MVC",path=files,infer_schema = TRUE)
spark_disconnect(sc)

非常感谢任何帮助。

这是使用 s3a://

的错误转储
Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a:
    at org.apache.hadoop.fs.Path.initialize(Path.java:206)
    at org.apache.hadoop.fs.Path.<init>(Path.java:172)
    at org.apache.hadoop.fs.Path.<init>(Path.java:94)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.RDD$$anonfun$take.apply(RDD.scala:1307)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
    at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249)
    at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245)
    at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223)
    at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72)
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sparklyr.Invoke$.invoke(invoke.scala:94)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
    at sparklyr.StreamHandler$.read(stream.scala:55)
    at sparklyr.BackendHandler.channelRead0(handler.scala:49)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a:
    at java.net.URI$Parser.fail(Unknown Source)
    at java.net.URI$Parser.failExpecting(Unknown Source)
    at java.net.URI$Parser.parse(Unknown Source)
    at java.net.URI.<init>(Unknown Source)
    at org.apache.hadoop.fs.Path.initialize(Path.java:203)
    ... 58 more

使用 s3n://

的错误转储
Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n:
        at org.apache.hadoop.fs.Path.initialize(Path.java:206)
        at org.apache.hadoop.fs.Path.<init>(Path.java:172)
        at org.apache.hadoop.fs.Path.<init>(Path.java:94)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.RDD$$anonfun$take.apply(RDD.scala:1307)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
        at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249)
        at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245)
        at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223)
        at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72)
        at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
        at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at sparklyr.Invoke$.invoke(invoke.scala:94)
        at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
        at sparklyr.StreamHandler$.read(stream.scala:55)
        at sparklyr.BackendHandler.channelRead0(handler.scala:49)
        at sparklyr.BackendHandler.channelRead0(handler.scala:14)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Unknown Source)
    Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n:
        at java.net.URI$Parser.fail(Unknown Source)
        at java.net.URI$Parser.failExpecting(Unknown Source)
        at java.net.URI$Parser.parse(Unknown Source)
        at java.net.URI.<init>(Unknown Source)
        at org.apache.hadoop.fs.Path.initialize(Path.java:203)


... 58 more

如果没有看到您的确切错误消息,很难说出到底出了什么问题。但是,我注意到一件事是您使用 s3n 而不是 s3a。这是为什么?我建议改为尝试 s3a

files <- 's3a://temp-sg/MVC'
temp <- spark_read_csv(sc, 
  name = 'MVC', 
  path = files,
  infer_schema = TRUE)

另请参阅 以了解有关两者之间区别的更多详细信息。

已解决问题。 这是代码片段。 注意: 需要验证正确的 JVM 是 运行。我在 64 位机器上使用了 32 位 jvm,因为 64 位机器不起作用。 - 火花版本 - 2.0 - hadoop 版本 - 2.7

# install.packages("devtools")
# devtools::install_github("rstudio/sparklyr") 

library(sparklyr)
library(dplyr)

# conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"
# config$spark.executor.memory <- "4g"
sc <- spark_connect(master = "local",config = conf)

#Get spark context  
ctx <- sparklyr::spark_context(sc)

#Use below to set the java spark context
jsc <- invoke_static(  
  sc,
  "org.apache.spark.api.java.JavaSparkContext",
  "fromSparkContext",
  ctx
)
#set the s3 configs:  
hconf <- jsc %>% invoke("hadoopConfiguration")  
hconf %>% invoke("set","fs.s3a.access.key", "xxxx")  
hconf %>% invoke("set","fs.s3a.secret.key", "xxxx")  

# check if spar session is active
sparklyr::spark_connection_is_open(sc=sc)


small_file = "s3a://temp-sg/MVC"

temp<-spark_read_csv(sc,name = "MVC",path=small_file,infer_schema = TRUE)
spark_disconnect(sc)