如何使用 sc.textFile("s3n://bucket/*.csv") 将文件名映射到 RDD?
How to map filenames to RDD using sc.textFile("s3n://bucket/*.csv")?
请注意,我必须使用 sc.textFile,但我会接受任何其他答案。
我想做的是简单地将正在处理的文件名添加到 RDD.... 类似的东西:
var rdd = sc.textFile("s3n://bucket/*.csv").map(line=>filename+","+line)
非常感谢!
EDIT2:EDIT1 的解决方案是使用 Hadoop 2.4 或更高版本。但是,我还没有通过使用从站来测试它......等等。但是,一些提到的解决方案仅适用于小型数据集。如果你想使用大数据,你将不得不使用 HadoopRDD
编辑:我尝试了以下方法,但没有用:
:cp symjar/aws-java-sdk-1.9.29.jar
:cp symjar/aws-java-sdk-flow-build-tools-1.9.29.jar
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{S3ObjectSummary, ObjectListing, GetObjectRequest}
import com.amazonaws.auth._
def awsAccessKeyId = "AKEY"
def awsSecretAccessKey = "SKEY"
val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
var rdd = sc.wholeTextFiles("s3n://bucket/dir/*.csv").map { case (filename, content) => filename }
rdd.count
注意:它正在连接到 S3,这不是问题(因为我已经测试过很多次)。
我得到的错误是:
INFO input.FileInputFormat: Total input paths to process : 4
java.io.FileNotFoundException: File does not exist: /RTLM-918/simple/t1-100.csv
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:489)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC$$iwC.<init>(<console>:46)
at $iwC$$iwC.<init>(<console>:48)
at $iwC.<init>(<console>:50)
at <init>(<console>:52)
at .<init>(<console>:56)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
只需在映射期间将其作为变量传入(或设置为对象属性)。
sc.textFile("s3n://bucket/"+fn+".csv").map(line=>set_filepath(line,fn))
唯一包含文件名的文本方法是wholeTextFiles
。
sc.wholeTextFiles(path).map { case (filename, content) => ... }
如果您正在处理大数据,那么 HadoopRDD 就是答案。否则,用其他的建议,是行不通的。
代码:
val text = sc.hadoopFile("s3n://.../", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
// Cast to a HadoopRDD
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map { tpl ⇒ (file.getPath, tpl._2) }
}
请注意,我必须使用 sc.textFile,但我会接受任何其他答案。
我想做的是简单地将正在处理的文件名添加到 RDD.... 类似的东西:
var rdd = sc.textFile("s3n://bucket/*.csv").map(line=>filename+","+line)
非常感谢!
EDIT2:EDIT1 的解决方案是使用 Hadoop 2.4 或更高版本。但是,我还没有通过使用从站来测试它......等等。但是,一些提到的解决方案仅适用于小型数据集。如果你想使用大数据,你将不得不使用 HadoopRDD
编辑:我尝试了以下方法,但没有用:
:cp symjar/aws-java-sdk-1.9.29.jar
:cp symjar/aws-java-sdk-flow-build-tools-1.9.29.jar
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{S3ObjectSummary, ObjectListing, GetObjectRequest}
import com.amazonaws.auth._
def awsAccessKeyId = "AKEY"
def awsSecretAccessKey = "SKEY"
val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
var rdd = sc.wholeTextFiles("s3n://bucket/dir/*.csv").map { case (filename, content) => filename }
rdd.count
注意:它正在连接到 S3,这不是问题(因为我已经测试过很多次)。
我得到的错误是:
INFO input.FileInputFormat: Total input paths to process : 4
java.io.FileNotFoundException: File does not exist: /RTLM-918/simple/t1-100.csv
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:489)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC$$iwC.<init>(<console>:46)
at $iwC$$iwC.<init>(<console>:48)
at $iwC.<init>(<console>:50)
at <init>(<console>:52)
at .<init>(<console>:56)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
只需在映射期间将其作为变量传入(或设置为对象属性)。
sc.textFile("s3n://bucket/"+fn+".csv").map(line=>set_filepath(line,fn))
唯一包含文件名的文本方法是wholeTextFiles
。
sc.wholeTextFiles(path).map { case (filename, content) => ... }
如果您正在处理大数据,那么 HadoopRDD 就是答案。否则,用其他的建议,是行不通的。
代码:
val text = sc.hadoopFile("s3n://.../", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
// Cast to a HadoopRDD
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map { tpl ⇒ (file.getPath, tpl._2) }
}