点燃整个纺织品 - 许多小文件
spark whole textiles - many small files
我想通过 spark 将许多小文本文件提取到 parquet。目前,我使用 wholeTextFiles
并另外执行一些解析。
更准确地说,这些小文本文件是 ESRi ASCII 网格文件,每个文件的最大大小约为 400kb。 GeoTools 用于解析它们,如下所述。
您认为有任何优化的可能性吗?也许是为了避免创建不必要的对象?或者更好地处理小文件的东西。我想知道只获取文件的路径并手动读取它们而不是使用 String -> ByteArrayInputStream
.
是否更好
case class RawRecords(path: String, content: String)
case class GeometryId(idPath: String, value: Double, geo: String)
@transient lazy val extractor = new PolygonExtractionProcess()
@transient lazy val writer = new WKTWriter()
def readRawFiles(path: String, parallelism: Int, spark: SparkSession) = {
import spark.implicits._
spark.sparkContext
.wholeTextFiles(path, parallelism)
.toDF("path", "content")
.as[RawRecords]
.mapPartitions(mapToSimpleTypes)
}
def mapToSimpleTypes(iterator: Iterator[RawRecords]): Iterator[GeometryId] = iterator.flatMap(r => {
val extractor = new PolygonExtractionProcess()
// http://docs.geotools.org/latest/userguide/library/coverage/arcgrid.html
val readRaster = new ArcGridReader(new ByteArrayInputStream(r.content.getBytes(StandardCharsets.UTF_8))).read(null)
// TODO maybe consider optimization of known size instead of using growable data structure
val vectorizedFeatures = extractor.execute(readRaster, 0, true, null, null, null, null).features
val result: collection.Seq[GeometryId] with Growable[GeometryId] = mutable.Buffer[GeometryId]()
while (vectorizedFeatures.hasNext) {
val vectorizedFeature = vectorizedFeatures.next()
val geomWKTLineString = vectorizedFeature.getDefaultGeometry match {
case g: Geometry => writer.write(g)
}
val geomUserdata = vectorizedFeature.getAttribute(1).asInstanceOf[Double]
result += GeometryId(r.path, geomUserdata, geomWKTLineString)
}
result
})
我有建议:
- 使用
wholeTextFile
-> mapPartitions
-> 转换为数据集。为什么?如果您在 Dataset 上创建 mapPartitions
,则所有行都从内部格式转换为对象 - 它会导致额外的序列化。
- 运行 Java 任务控制并对您的应用程序进行采样。它将显示方法的所有编译和执行时间
- 也许你可以使用
binaryFiles
,它会给你Stream
,这样你就可以解析它而无需额外阅读mapPartitions
我想通过 spark 将许多小文本文件提取到 parquet。目前,我使用 wholeTextFiles
并另外执行一些解析。
更准确地说,这些小文本文件是 ESRi ASCII 网格文件,每个文件的最大大小约为 400kb。 GeoTools 用于解析它们,如下所述。
您认为有任何优化的可能性吗?也许是为了避免创建不必要的对象?或者更好地处理小文件的东西。我想知道只获取文件的路径并手动读取它们而不是使用 String -> ByteArrayInputStream
.
case class RawRecords(path: String, content: String)
case class GeometryId(idPath: String, value: Double, geo: String)
@transient lazy val extractor = new PolygonExtractionProcess()
@transient lazy val writer = new WKTWriter()
def readRawFiles(path: String, parallelism: Int, spark: SparkSession) = {
import spark.implicits._
spark.sparkContext
.wholeTextFiles(path, parallelism)
.toDF("path", "content")
.as[RawRecords]
.mapPartitions(mapToSimpleTypes)
}
def mapToSimpleTypes(iterator: Iterator[RawRecords]): Iterator[GeometryId] = iterator.flatMap(r => {
val extractor = new PolygonExtractionProcess()
// http://docs.geotools.org/latest/userguide/library/coverage/arcgrid.html
val readRaster = new ArcGridReader(new ByteArrayInputStream(r.content.getBytes(StandardCharsets.UTF_8))).read(null)
// TODO maybe consider optimization of known size instead of using growable data structure
val vectorizedFeatures = extractor.execute(readRaster, 0, true, null, null, null, null).features
val result: collection.Seq[GeometryId] with Growable[GeometryId] = mutable.Buffer[GeometryId]()
while (vectorizedFeatures.hasNext) {
val vectorizedFeature = vectorizedFeatures.next()
val geomWKTLineString = vectorizedFeature.getDefaultGeometry match {
case g: Geometry => writer.write(g)
}
val geomUserdata = vectorizedFeature.getAttribute(1).asInstanceOf[Double]
result += GeometryId(r.path, geomUserdata, geomWKTLineString)
}
result
})
我有建议:
- 使用
wholeTextFile
->mapPartitions
-> 转换为数据集。为什么?如果您在 Dataset 上创建mapPartitions
,则所有行都从内部格式转换为对象 - 它会导致额外的序列化。 - 运行 Java 任务控制并对您的应用程序进行采样。它将显示方法的所有编译和执行时间
- 也许你可以使用
binaryFiles
,它会给你Stream
,这样你就可以解析它而无需额外阅读mapPartitions