函数 returns Spark 中的一个空列表
Function returns an empty List in Spark
下面是获取压缩文件中文件名列表的代码
def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = {
val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open))
val filesInZip = new ArrayBuffer[String]()
var ze : Option[ZipEntry] = None
zipInputStream.foreach(stream =>{
do{
ze = Option(stream.getNextEntry);
ze.foreach{ze =>
if(ze.getName.endsWith("java") && !ze.isDirectory()){
var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java"))
filesInZip += fileName
}
}
stream.closeEntry()
} while(ze.isDefined)
println(filesInZip.toList.length) // print 889 (correct)
})
println(filesInZip.toList.length) // print 0 (WHY..?)
(filesInZip.toList)
}
我按以下方式执行上面的代码:
scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip")
zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25
scala> getListOfFilesInRepo(zipRDD)
889
0
res12: List[String] = List()
为什么我没有得到 889 而是得到 0?
发生这种情况是因为 filesInZip
不在工作人员之间共享。 foreach
在 filesInZip
的本地副本上运行,当它完成时,这个副本被简单地丢弃并被垃圾收集。如果您想保留结果,您应该使用转换(很可能是 flatMap
)和 return 收集的聚合值。
def listFiles(stream: PortableDataStream): TraversableOnce[String] = ???
zipInputStream.flatMap(listFiles)
了解更多
下面是获取压缩文件中文件名列表的代码
def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = {
val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open))
val filesInZip = new ArrayBuffer[String]()
var ze : Option[ZipEntry] = None
zipInputStream.foreach(stream =>{
do{
ze = Option(stream.getNextEntry);
ze.foreach{ze =>
if(ze.getName.endsWith("java") && !ze.isDirectory()){
var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java"))
filesInZip += fileName
}
}
stream.closeEntry()
} while(ze.isDefined)
println(filesInZip.toList.length) // print 889 (correct)
})
println(filesInZip.toList.length) // print 0 (WHY..?)
(filesInZip.toList)
}
我按以下方式执行上面的代码:
scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip")
zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25
scala> getListOfFilesInRepo(zipRDD)
889
0
res12: List[String] = List()
为什么我没有得到 889 而是得到 0?
发生这种情况是因为 filesInZip
不在工作人员之间共享。 foreach
在 filesInZip
的本地副本上运行,当它完成时,这个副本被简单地丢弃并被垃圾收集。如果您想保留结果,您应该使用转换(很可能是 flatMap
)和 return 收集的聚合值。
def listFiles(stream: PortableDataStream): TraversableOnce[String] = ???
zipInputStream.flatMap(listFiles)
了解更多